You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xi...@apache.org on 2017/08/24 00:06:19 UTC

[1/4] hadoop git commit: HDFS-10899. Add functionality to re-encrypt EDEKs.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 26d8c8fa5 -> 1000a2af0


http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryptionHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryptionHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryptionHandler.java
new file mode 100644
index 0000000..f0dd92c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryptionHandler.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
+import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.KMSUtil;
+import org.apache.hadoop.util.StopWatch;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_KEY;
+import static org.junit.Assert.fail;
+
+/**
+ * Test class for ReencryptionHandler.
+ */
+public class TestReencryptionHandler {
+
+  protected static final org.slf4j.Logger LOG =
+      LoggerFactory.getLogger(TestReencryptionHandler.class);
+
+  @Rule
+  public Timeout globalTimeout = new Timeout(180 * 1000);
+
+  @Before
+  public void setup() {
+    GenericTestUtils.setLogLevel(ReencryptionHandler.LOG, Level.TRACE);
+  }
+
+  private ReencryptionHandler mockReencryptionhandler(final Configuration conf)
+      throws IOException {
+    // mock stuff to create a mocked ReencryptionHandler
+    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
+        JavaKeyStoreProvider.SCHEME_NAME + "://file" + new Path(
+            new FileSystemTestHelper().getTestRootDir(), "test.jks").toUri());
+    final EncryptionZoneManager ezm = Mockito.mock(EncryptionZoneManager.class);
+    final KeyProvider kp = KMSUtil.createKeyProvider(conf,
+        CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH);
+    Mockito.when(ezm.getProvider()).thenReturn(
+        KeyProviderCryptoExtension.createKeyProviderCryptoExtension(kp));
+    return new ReencryptionHandler(ezm, conf);
+  }
+
+  @Test
+  public void testThrottle() throws Exception {
+    final Configuration conf = new Configuration();
+    conf.setDouble(DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_KEY,
+        0.5);
+    final ReencryptionHandler rh = mockReencryptionhandler(conf);
+
+    // mock StopWatches so all = 30s, locked = 20s. With ratio = .5, throttle
+    // should wait for 30 * 0.5 - 20 = 5s.
+    final StopWatch mockAll = Mockito.mock(StopWatch.class);
+    Mockito.when(mockAll.now(TimeUnit.MILLISECONDS)).thenReturn((long) 30000);
+    Mockito.when(mockAll.reset()).thenReturn(mockAll);
+    final StopWatch mockLocked = Mockito.mock(StopWatch.class);
+    Mockito.when(mockLocked.now(TimeUnit.MILLISECONDS))
+        .thenReturn((long) 20000);
+    Mockito.when(mockLocked.reset()).thenReturn(mockLocked);
+    final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
+    Whitebox.setInternalState(rh, "throttleTimerAll", mockAll);
+    Whitebox.setInternalState(rh, "throttleTimerLocked", mockLocked);
+    Whitebox.setInternalState(rh, "taskQueue", queue);
+    final StopWatch sw = new StopWatch().start();
+    rh.throttle();
+    sw.stop();
+    assertTrue("should have throttled for at least 4 second",
+        sw.now(TimeUnit.MILLISECONDS) > 8000);
+    assertTrue("should have throttled for at most 6 second",
+        sw.now(TimeUnit.MILLISECONDS) < 12000);
+  }
+
+  @Test
+  public void testThrottleNoOp() throws Exception {
+    final Configuration conf = new Configuration();
+    conf.setDouble(DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_KEY,
+        0.5);
+    final ReencryptionHandler rh = mockReencryptionhandler(conf);
+
+    // mock StopWatches so all = 30s, locked = 10s. With ratio = .5, throttle
+    // should not happen.
+    StopWatch mockAll = Mockito.mock(StopWatch.class);
+    Mockito.when(mockAll.now()).thenReturn(new Long(30000));
+    Mockito.when(mockAll.reset()).thenReturn(mockAll);
+    StopWatch mockLocked = Mockito.mock(StopWatch.class);
+    Mockito.when(mockLocked.now()).thenReturn(new Long(10000));
+    Mockito.when(mockLocked.reset()).thenReturn(mockLocked);
+    final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
+    Whitebox.setInternalState(rh, "throttleTimerAll", mockAll);
+    Whitebox.setInternalState(rh, "throttleTimerLocked", mockLocked);
+    Whitebox.setInternalState(rh, "taskQueue", queue);
+    final Map<Long, ReencryptionUpdater.ZoneSubmissionTracker>
+        submissions = new HashMap<>();
+    Whitebox.setInternalState(rh, "submissions", submissions);
+    StopWatch sw = new StopWatch().start();
+    rh.throttle();
+    sw.stop();
+    assertTrue("should not have throttled",
+        sw.now(TimeUnit.MILLISECONDS) < 1000);
+  }
+
+  @Test
+  public void testThrottleConfigs() throws Exception {
+    final Configuration conf = new Configuration();
+    conf.setDouble(DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_KEY,
+        -1.0);
+    try {
+      mockReencryptionhandler(conf);
+      fail("Should not be able to init");
+    } catch (IllegalArgumentException e) {
+      GenericTestUtils.assertExceptionContains(" is not positive", e);
+    }
+
+    conf.setDouble(DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_KEY,
+        0.0);
+    try {
+      mockReencryptionhandler(conf);
+      fail("Should not be able to init");
+    } catch (IllegalArgumentException e) {
+      GenericTestUtils.assertExceptionContains(" is not positive", e);
+    }
+  }
+
+  @Test
+  public void testThrottleAccumulatingTasks() throws Exception {
+    final Configuration conf = new Configuration();
+    final ReencryptionHandler rh = mockReencryptionhandler(conf);
+
+    // mock tasks piling up
+    final Map<Long, ReencryptionUpdater.ZoneSubmissionTracker>
+        submissions = new HashMap<>();
+    final ReencryptionUpdater.ZoneSubmissionTracker zst =
+        new ReencryptionUpdater.ZoneSubmissionTracker();
+    submissions.put(new Long(1), zst);
+    Future mock = Mockito.mock(Future.class);
+    for (int i = 0; i < Runtime.getRuntime().availableProcessors() * 3; ++i) {
+      zst.addTask(mock);
+    }
+
+    Thread removeTaskThread = new Thread() {
+      public void run() {
+        try {
+          Thread.sleep(3000);
+        } catch (InterruptedException ie) {
+          LOG.info("removeTaskThread interrupted.");
+          Thread.currentThread().interrupt();
+        }
+        zst.getTasks().clear();
+      }
+    };
+
+    Whitebox.setInternalState(rh, "submissions", submissions);
+    final StopWatch sw = new StopWatch().start();
+    removeTaskThread.start();
+    rh.throttle();
+    sw.stop();
+    assertTrue("should have throttled for at least 3 second",
+        sw.now(TimeUnit.MILLISECONDS) > 3000);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testCryptoConf.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testCryptoConf.xml b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testCryptoConf.xml
index f9bb29e..a6980aa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testCryptoConf.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testCryptoConf.xml
@@ -603,5 +603,85 @@
         </comparator>
       </comparators>
     </test>
+
+    <!--More thorough test cases for re-encryption are in TestEncryptionZones-->
+    <test>
+      <description>Test success of reencrypt submission on a EZ</description>
+      <test-commands>
+        <command>-fs NAMENODE -mkdir /src</command>
+        <crypto-admin-command>-createZone -path /src -keyName myKey</crypto-admin-command>
+        <crypto-admin-command>-reencryptZone -start -path /src</crypto-admin-command>
+      </test-commands>
+      <cleanup-commands>
+        <command>-fs NAMENODE -rm -r -skipTrash /src</command>
+      </cleanup-commands>
+      <comparators>
+        <comparator>
+          <type>SubstringComparator</type>
+          <expected-output>successfully submitted for zone: /src action: START</expected-output>
+        </comparator>
+      </comparators>
+    </test>
+
+    <test>
+      <description>Test failure of reencrypt submission on a non-EZ</description>
+      <test-commands>
+        <command>-fs NAMENODE -mkdir /src</command>
+        <crypto-admin-command>-reencryptZone -start -path /src</crypto-admin-command>
+      </test-commands>
+      <cleanup-commands>
+        <command>-fs NAMENODE -rm -r -skipTrash /src</command>
+      </cleanup-commands>
+      <comparators>
+        <comparator>
+          <type>SubstringComparator</type>
+          <expected-output>not the root of an encryption zone</expected-output>
+        </comparator>
+      </comparators>
+    </test>
+
+    <!-- Cannot test successful cancel here, since the submit will finish very quickly.
+         TestReencryption covers successful cancellations. -->
+
+    <test>
+      <description>Test failure of reencrypt on cancellation a not-being-re-encrypted EZ</description>
+      <test-commands>
+        <command>-fs NAMENODE -mkdir /src</command>
+        <crypto-admin-command>-createZone -path /src -keyName myKey</crypto-admin-command>
+        <crypto-admin-command>-reencryptZone -cancel -path /src</crypto-admin-command>
+      </test-commands>
+      <cleanup-commands>
+        <command>-fs NAMENODE -rm -r -skipTrash /src</command>
+      </cleanup-commands>
+      <comparators>
+        <comparator>
+          <type>SubstringComparator</type>
+          <expected-output>Zone /src is not under re-encryption</expected-output>
+        </comparator>
+      </comparators>
+    </test>
+
+    <test>
+      <description>Test success of list reencrypt status on a EZ</description>
+      <test-commands>
+        <command>-fs NAMENODE -mkdir /src</command>
+        <command>-fs NAMENODE -mkdir /zone2</command>
+        <crypto-admin-command>-createZone -path /src -keyName myKey</crypto-admin-command>
+        <crypto-admin-command>-createZone -path /zone2 -keyName myKey</crypto-admin-command>
+        <crypto-admin-command>-reencryptZone -start -path /src</crypto-admin-command>
+        <crypto-admin-command>-reencryptZone -start -path /zone2</crypto-admin-command>
+        <crypto-admin-command>-listReencryptionStatus</crypto-admin-command>
+      </test-commands>
+      <cleanup-commands>
+        <command>-fs NAMENODE -rm -r -skipTrash /src</command>
+        <command>-fs NAMENODE -rm -r -skipTrash /zone2</command>
+      </cleanup-commands>
+      <comparators>
+        <comparator>
+          <type>TokenComparator</type>
+          <expected-output>/src,Completed,false,myKey,zone2</expected-output>
+        </comparator>
+      </comparators>
+    </test>
   </tests>
 </configuration>


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[2/4] hadoop git commit: HDFS-10899. Add functionality to re-encrypt EDEKs.

Posted by xi...@apache.org.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CryptoAdmin.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CryptoAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CryptoAdmin.java
index 14abf6e..4b0b083 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CryptoAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CryptoAdmin.java
@@ -32,8 +32,11 @@ import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hdfs.client.CreateEncryptionZoneFlag;
 import org.apache.hadoop.hdfs.client.HdfsAdmin;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
+import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
 import org.apache.hadoop.tools.TableListing;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -286,10 +289,139 @@ public class CryptoAdmin extends Configured implements Tool {
     }
   }
 
+  private static class ReencryptZoneCommand implements AdminHelper.Command {
+    @Override
+    public String getName() {
+      return "-reencryptZone";
+    }
+
+    @Override
+    public String getShortUsage() {
+      return "[" + getName() + " <action> -path <zone>]\n";
+    }
+
+    @Override
+    public String getLongUsage() {
+      final TableListing listing = AdminHelper.getOptionDescriptionListing();
+      listing.addRow("<action>",
+          "The re-encrypt action to perform. Must be -start or -cancel.");
+      listing.addRow("<zone>", "The path to the zone to be re-encrypted.");
+      return getShortUsage() + "\n" + "Issue a re-encryption command for"
+          + " an encryption zone. Requires superuser permissions.\n\n"
+          + listing.toString();
+    }
+
+    @Override
+    public int run(Configuration conf, List<String> args) throws IOException {
+      final String path = StringUtils.popOptionWithArgument("-path", args);
+      final boolean start = StringUtils.popOption("-start", args);
+      final boolean cancel = StringUtils.popOption("-cancel", args);
+
+      if (!args.isEmpty()) {
+        System.err.println("Can't understand argument: " + args.get(0));
+        getLongUsage();
+        return 1;
+      }
+      if (!(start ^ cancel)) {
+        System.err.println("You must specify either [-start] or [-cancel]. ");
+        getLongUsage();
+        return 2;
+      }
+      if (path == null) {
+        System.err.println("You must specify a zone directory with [-path]. ");
+        getLongUsage();
+        return 3;
+      }
+      ReencryptAction action = ReencryptAction.START;
+      if (cancel) {
+        action = ReencryptAction.CANCEL;
+      }
+
+      final HdfsAdmin admin =
+          new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
+      try {
+        admin.reencryptEncryptionZone(new Path(path), action);
+        System.out.println("re-encrypt command successfully submitted for "
+            + "zone: " + path + " action: " + action);
+      } catch (IOException e) {
+        System.err.println(prettifyException(e));
+        return 4;
+      }
+      return 0;
+    }
+  }
+
+  private static class ListReencryptionStatusCommand
+      implements AdminHelper.Command {
+    @Override
+    public String getName() {
+      return "-listReencryptionStatus";
+    }
+
+    @Override
+    public String getShortUsage() {
+      return "[" + getName()+ "]\n";
+    }
+
+    @Override
+    public String getLongUsage() {
+      return getShortUsage() + "\n" +
+          "List re-encryption statuses of encryption zones. "
+          + "Requires superuser permissions.\n\n";
+    }
+
+    @Override
+    public int run(Configuration conf, List<String> args) throws IOException {
+      HdfsAdmin admin = new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
+      try {
+        final TableListing listing =
+            new TableListing.Builder().addField("Zone Name").addField("Status")
+                .addField("EZKey Version Name").addField("Submission Time")
+                .addField("Is Canceled?").addField("Completion Time")
+                .addField("Number of files re-encrypted")
+                .addField("Number of failures")
+                .addField("Last File Checkpointed")
+                .wrapWidth(AdminHelper.MAX_LINE_WIDTH).showHeaders().build();
+        final RemoteIterator<ZoneReencryptionStatus> it =
+            admin.listReencryptionStatus();
+        boolean failuresMet = false;
+        while (it.hasNext()) {
+          ZoneReencryptionStatus zs = it.next();
+          final long completion = zs.getCompletionTime();
+          listing.addRow(zs.getZoneName(), zs.getState().toString(),
+              zs.getEzKeyVersionName(), Time.formatTime(zs.getSubmissionTime()),
+              Boolean.toString(zs.isCanceled()),
+              completion == 0 ? "N/A" : Time.formatTime(completion),
+              Long.toString(zs.getFilesReencrypted()),
+              Long.toString(zs.getNumReencryptionFailures()),
+              zs.getLastCheckpointFile());
+          if (zs.getNumReencryptionFailures() > 0) {
+            failuresMet = true;
+          }
+        }
+        System.out.println(listing.toString());
+        if (failuresMet) {
+          System.out.println("There are re-encryption failures. Files that are"
+              + " failed to re-encrypt are still using the old EDEKs. "
+              + "Please check NameNode log to see which files failed,"
+              + " then either fix the error and re-encrypt again,"
+              + " or manually copy the failed files to use new EDEKs.");
+        }
+      } catch (IOException e) {
+        System.err.println(prettifyException(e));
+        return 2;
+      }
+
+      return 0;
+    }
+  }
+
   private static final AdminHelper.Command[] COMMANDS = {
       new CreateZoneCommand(),
       new ListZonesCommand(),
       new ProvisionTrashCommand(),
-      new GetFileEncryptionInfoCommand()
+      new GetFileEncryptionInfoCommand(),
+      new ReencryptZoneCommand(),
+      new ListReencryptionStatusCommand()
   };
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 03becc9..aedc7e8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -2800,6 +2800,15 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.namenode.list.reencryption.status.num.responses</name>
+  <value>100</value>
+  <description>When listing re-encryption status, the maximum number of zones
+    that will be returned in a batch. Fetching the list incrementally in
+    batches improves namenode performance.
+  </description>
+</property>
+
   <property>
     <name>dfs.namenode.list.openfiles.num.responses</name>
     <value>1000</value>
@@ -2829,6 +2838,49 @@
 </property>
 
 <property>
+  <name>dfs.namenode.reencrypt.sleep.interval</name>
+  <value>1m</value>
+  <description>Interval the re-encrypt EDEK thread sleeps in the main loop. The
+    interval accepts units. If none given, millisecond is assumed.
+  </description>
+</property>
+
+<property>
+  <name>dfs.namenode.reencrypt.batch.size</name>
+  <value>1000</value>
+  <description>How many EDEKs should the re-encrypt thread process in one batch.
+  </description>
+</property>
+
+<property>
+  <name>dfs.namenode.reencrypt.throttle.limit.handler.ratio</name>
+  <value>1.0</value>
+  <description>Throttling ratio for the re-encryption, indicating what fraction
+    of time should the re-encrypt handler thread work under NN read lock.
+    Larger than 1.0 values are interpreted as 1.0. Negative value or 0 are
+    invalid values and will fail NN startup.
+  </description>
+</property>
+
+<property>
+  <name>dfs.namenode.reencrypt.throttle.limit.updater.ratio</name>
+  <value>1.0</value>
+  <description>Throttling ratio for the re-encryption, indicating what fraction
+    of time should the re-encrypt updater thread work under NN write lock.
+    Larger than 1.0 values are interpreted as 1.0. Negative value or 0 are
+    invalid values and will fail NN startup.
+  </description>
+</property>
+
+<property>
+  <name>dfs.namenode.reencrypt.edek.threads</name>
+  <value>10</value>
+  <description>Maximum number of re-encrypt threads to contact the KMS
+    and re-encrypt the edeks.
+  </description>
+</property>
+
+<property>
   <name>dfs.namenode.inotify.max.events.per.rpc</name>
   <value>1000</value>
   <description>Maximum number of events that will be sent to an inotify client

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/TransparentEncryption.md
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/TransparentEncryption.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/TransparentEncryption.md
index 3f9fbf0..3454265 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/TransparentEncryption.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/TransparentEncryption.md
@@ -177,6 +177,33 @@ Get encryption information from a file. This can be used to find out whether a f
 |:---- |:---- |
 | *path* | The path of the file to get encryption information. |
 
+### <a name="reencryptZone"></a>reencryptZone
+
+Usage: `[-reencryptZone <action> -path <zone>]`
+
+Re-encrypts an encryption zone, by iterating through the encryption zone, and calling the KeyProvider's reencryptEncryptedKeys interface to batch-re-encrypt all files' EDEKs with the latest version encryption zone key in the key provider. Requires superuser permissions.
+
+Note that re-encryption does not apply to snapshots, due to snapshots' immutable nature.
+
+| | |
+|:---- |:---- |
+| *action* | The re-encrypt action to perform. Must be either `-start` or `-cancel`. |
+| *path* | The path to the root of the encryption zone. |
+
+Re-encryption is a NameNode-only operation in HDFS, so could potentially put intensive load to the NameNode. The following configurations can be changed to control the stress on the NameNode, depending on the acceptable throughput impact to the cluster.
+
+| | |
+|:---- |:---- |
+| *dfs.namenode.reencrypt.batch.size* | The number of EDEKs in a batch to be sent to the KMS for re-encryption. Each batch is processed when holding the name system read/write lock, with throttling happening between batches. See configs below. |
+| *dfs.namenode.reencrypt.throttle.limit.handler.ratio* | Ratio of read locks to be held during re-encryption. 1.0 means no throttling. 0.5 means re-encryption can hold the readlock at most 50% of its total processing time. Negative value or 0 are invalid. |
+| *dfs.namenode.reencrypt.throttle.limit.updater.ratio* | Ratio of write locks to be held during re-encryption. 1.0 means no throttling. 0.5 means re-encryption can hold the writelock at most 50% of its total processing time. Negative value or 0 are invalid. |
+
+### <a name="listReencryptionStatus"></a>listReencryptionStatus
+
+Usage: `[-listReencryptionStatus]`
+
+List re-encryption information for all encryption zones. Requires superuser permissions.
+
 <a name="Example_usage"></a>Example usage
 -------------
 
@@ -282,4 +309,20 @@ These exploits assume that the attacker has compromised HDFS, but does not have
 
 ### <a name="Rogue_user_exploits"></a>Rogue user exploits
 
-A rogue user can collect keys of files they have access to, and use them later to decrypt the encrypted data of those files. As the user had access to those files, they already had access to the file contents. This can be mitigated through periodic key rolling policies.
+A rogue user can collect keys of files they have access to, and use them later to decrypt the encrypted data of those files. As the user had access to those files, they already had access to the file contents. This can be mitigated through periodic key rolling policies. The [reencryptZone](#reencryptZone) command is usually required after key rolling, to make sure the EDEKs on existing files use the new version key.
+
+Manual steps to a complete key rolling and re-encryption are listed below. These instructions assume that you are running as the key admin or HDFS superuser as is appropriate.
+
+    # As the key admin, roll the key to a new version
+    hadoop key roll exposedKey
+
+    # As the super user, re-encrypt the encryption zone. Possibly list zones first.
+    hdfs crypto -listZones
+    hdfs crypto -reencryptZone -start -path /zone
+
+    # As the super user, periodically check the status of re-encryption
+    hdfs crypto -listReencryptionStatus
+
+    # As the super user, get encryption information from the file and double check it's encryption key version
+    hdfs crypto -getFileEncryptionInfo -path /zone/helloWorld
+    # console output: {cipherSuite: {name: AES/CTR/NoPadding, algorithmBlockSize: 16}, cryptoProtocolVersion: CryptoProtocolVersion{description='Encryption zones', version=2, unknownValue=null}, edek: 2010d301afbd43b58f10737ce4e93b39, iv: ade2293db2bab1a2e337f91361304cb3, keyName: exposedKey, ezKeyVersionName: exposedKey@1}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryption.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryption.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryption.java
new file mode 100644
index 0000000..7ba3f91
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryption.java
@@ -0,0 +1,1847 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import com.google.common.base.Supplier;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileContextTestWrapper;
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.FileSystemTestWrapper;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.CreateEncryptionZoneFlag;
+import org.apache.hadoop.hdfs.client.HdfsAdmin;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
+import org.apache.hadoop.hdfs.protocol.ReencryptionStatus;
+import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
+import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
+import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.ZoneSubmissionTracker;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.RetriableException;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.junit.rules.Timeout;
+import org.mockito.internal.util.reflection.Whitebox;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+
+/**
+ * Test class for re-encryption.
+ */
+public class TestReencryption {
+
+  protected static final org.slf4j.Logger LOG =
+      LoggerFactory.getLogger(TestReencryption.class);
+
+  private Configuration conf;
+  private FileSystemTestHelper fsHelper;
+
+  private MiniDFSCluster cluster;
+  private HdfsAdmin dfsAdmin;
+  private DistributedFileSystem fs;
+  private FSNamesystem fsn;
+  private File testRootDir;
+  private static final String TEST_KEY = "test_key";
+
+  private FileSystemTestWrapper fsWrapper;
+  private FileContextTestWrapper fcWrapper;
+
+  private static final EnumSet<CreateEncryptionZoneFlag> NO_TRASH =
+      EnumSet.of(CreateEncryptionZoneFlag.NO_TRASH);
+
+  private String getKeyProviderURI() {
+    return JavaKeyStoreProvider.SCHEME_NAME + "://file" + new Path(
+        testRootDir.toString(), "test.jks").toUri();
+  }
+
+  @Rule
+  public Timeout globalTimeout = new Timeout(180 * 1000);
+
+  @Before
+  public void setup() throws Exception {
+    conf = new HdfsConfiguration();
+    fsHelper = new FileSystemTestHelper();
+    // Set up java key store
+    String testRoot = fsHelper.getTestRootDir();
+    testRootDir = new File(testRoot).getAbsoluteFile();
+    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
+        getKeyProviderURI());
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
+        true);
+    // Lower the batch size for testing
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES,
+        2);
+    // Lower the listing limit for testing
+    conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, 3);
+    // Adjust configs for re-encrypt test cases
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REENCRYPT_BATCH_SIZE_KEY, 5);
+    conf.setTimeDuration(
+        DFSConfigKeys.DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_KEY, 1,
+        TimeUnit.SECONDS);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    fs = cluster.getFileSystem();
+    fsn = cluster.getNamesystem();
+    fsWrapper = new FileSystemTestWrapper(fs);
+    fcWrapper = new FileContextTestWrapper(
+        FileContext.getFileContext(cluster.getURI(), conf));
+    dfsAdmin = new HdfsAdmin(cluster.getURI(), conf);
+    setProvider();
+    // Create a test key
+    DFSTestUtil.createKey(TEST_KEY, cluster, conf);
+    GenericTestUtils.setLogLevel(EncryptionZoneManager.LOG, Level.TRACE);
+    GenericTestUtils.setLogLevel(ReencryptionHandler.LOG, Level.TRACE);
+    GenericTestUtils.setLogLevel(ReencryptionStatus.LOG, Level.TRACE);
+    GenericTestUtils.setLogLevel(ReencryptionUpdater.LOG, Level.TRACE);
+  }
+
+  private void setProvider() {
+    // Need to set the client's KeyProvider to the NN's for JKS,
+    // else the updates do not get flushed properly
+    fs.getClient()
+        .setKeyProvider(cluster.getNameNode().getNamesystem().getProvider());
+  }
+
+  @After
+  public void teardown() {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+    EncryptionFaultInjector.instance = new EncryptionFaultInjector();
+  }
+
+  private FileEncryptionInfo getFileEncryptionInfo(Path path) throws Exception {
+    return fsn.getFileInfo(path.toString(), false).getFileEncryptionInfo();
+  }
+
+  @Test
+  public void testReencryptionBasic() throws Exception {
+    /* Setup test dir:
+     * /zones/zone/[0-9]
+     * /dir/f
+     */
+    final int len = 8196;
+    final Path zoneParent = new Path("/zones");
+    final Path zone = new Path(zoneParent, "zone");
+    fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
+    dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
+    for (int i = 0; i < 10; ++i) {
+      DFSTestUtil
+          .createFile(fs, new Path(zone, Integer.toString(i)), len, (short) 1,
+              0xFEED);
+    }
+    final Path subdir = new Path("/dir");
+    fsWrapper.mkdir(subdir, FsPermission.getDirDefault(), true);
+    DFSTestUtil.createFile(fs, new Path(subdir, "f"), len, (short) 1, 0xFEED);
+
+    // test re-encrypt without keyroll
+    final Path encFile1 = new Path(zone, "0");
+    final FileEncryptionInfo fei0 = getFileEncryptionInfo(encFile1);
+    dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
+    waitForReencryptedZones(1);
+    assertKeyVersionEquals(encFile1, fei0);
+    // key not rolled, so no edeks need to be updated.
+    verifyZoneStatus(zone, null, 0);
+
+    // test re-encrypt after keyroll
+    fsn.getProvider().rollNewVersion(TEST_KEY);
+    fsn.getProvider().flush();
+    dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
+    waitForReencryptedZones(2);
+    FileEncryptionInfo fei1 = getFileEncryptionInfo(encFile1);
+    assertKeyVersionChanged(encFile1, fei0);
+
+    // test listReencryptionStatus
+    RemoteIterator<ZoneReencryptionStatus> it =
+        dfsAdmin.listReencryptionStatus();
+    assertTrue(it.hasNext());
+    ZoneReencryptionStatus zs = it.next();
+    assertEquals(zone.toString(), zs.getZoneName());
+    assertEquals(ZoneReencryptionStatus.State.Completed, zs.getState());
+    assertTrue(zs.getCompletionTime() > 0);
+    assertTrue(zs.getCompletionTime() > zs.getSubmissionTime());
+    assertNotEquals(fei0.getEzKeyVersionName(), zs.getEzKeyVersionName());
+    assertEquals(fei1.getEzKeyVersionName(), zs.getEzKeyVersionName());
+    assertEquals(10, zs.getFilesReencrypted());
+
+    // test re-encrypt on same zone again
+    dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
+    waitForReencryptedZones(3);
+    assertKeyVersionEquals(encFile1, fei1);
+
+    // test non-EZ submission
+    try {
+      dfsAdmin.reencryptEncryptionZone(subdir, ReencryptAction.START);
+      fail("Re-encrypting non-EZ should fail");
+    } catch (RemoteException expected) {
+      LOG.info("Expected exception caught.", expected);
+      assertExceptionContains("not the root of an encryption zone", expected);
+    }
+
+    // test non-existing dir
+    try {
+      dfsAdmin.reencryptEncryptionZone(new Path(zone, "notexist"),
+          ReencryptAction.START);
+      fail("Re-encrypting non-existing dir should fail");
+    } catch (RemoteException expected) {
+      LOG.info("Expected exception caught.", expected);
+      assertTrue(
+          expected.unwrapRemoteException() instanceof FileNotFoundException);
+    }
+
+    // test directly on a EZ file
+    try {
+      dfsAdmin.reencryptEncryptionZone(encFile1, ReencryptAction.START);
+      fail("Re-encrypting on a file should fail");
+    } catch (RemoteException expected) {
+      LOG.info("Expected exception caught.", expected);
+      assertExceptionContains("not the root of an encryption zone", expected);
+    }
+
+    // test same command resubmission
+    getEzManager().pauseReencryptForTesting();
+    dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
+    waitForQueuedZones(1);
+    try {
+      dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
+    } catch (RemoteException expected) {
+      LOG.info("Expected exception caught.", expected);
+      assertExceptionContains("already submitted", expected);
+    }
+    getEzManager().resumeReencryptForTesting();
+    waitForReencryptedZones(4);
+
+    // test empty EZ
+    final Path emptyZone = new Path("/emptyZone");
+    fsWrapper.mkdir(emptyZone, FsPermission.getDirDefault(), true);
+    dfsAdmin.createEncryptionZone(emptyZone, TEST_KEY, NO_TRASH);
+
+    dfsAdmin.reencryptEncryptionZone(emptyZone, ReencryptAction.START);
+    waitForReencryptedZones(5);
+
+    dfsAdmin.reencryptEncryptionZone(emptyZone, ReencryptAction.START);
+    waitForReencryptedZones(6);
+
+    // test rename ez and listReencryptionStatus
+    final Path renamedZone = new Path("/renamedZone");
+    fsWrapper.rename(zone, renamedZone);
+    it = dfsAdmin.listReencryptionStatus();
+    assertTrue(it.hasNext());
+    zs = it.next();
+    assertEquals(renamedZone.toString(), zs.getZoneName());
+  }
+
+  @Test
+  public void testReencryptOrdering() throws Exception {
+    /* Setup dir as follows:
+     * /zones/zone/[0-3]
+     * /zones/zone/dir/f
+     * /zones/zone/f[0-4]
+     */
+    final int len = 8196;
+    final Path zoneParent = new Path("/zones");
+    final Path zone = new Path(zoneParent, "zone");
+    fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
+    dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
+    Path subdir = new Path(zone, "dir");
+    fsWrapper.mkdir(subdir, FsPermission.getDirDefault(), true);
+    DFSTestUtil.createFile(fs, new Path(subdir, "f"), len, (short) 1, 0xFEED);
+    for (int i = 0; i < 4; ++i) {
+      DFSTestUtil
+          .createFile(fs, new Path(zone, Integer.toString(i)), len, (short) 1,
+              0xFEED);
+    }
+    for (int i = 0; i < 5; ++i) {
+      DFSTestUtil.createFile(fs, new Path(zone, "f" + Integer.toString(i)), len,
+          (short) 1, 0xFEED);
+    }
+
+    // /zones/zone/f[0-4] should be re-encrypted after /zones/zone/dir/f
+    final Path lastReencryptedFile = new Path(subdir, "f");
+    final Path notReencrypted = new Path(zone, "f0");
+    final FileEncryptionInfo fei = getFileEncryptionInfo(lastReencryptedFile);
+    final FileEncryptionInfo feiLast = getFileEncryptionInfo(notReencrypted);
+    fsn.getProvider().rollNewVersion(TEST_KEY);
+    fsn.getProvider().flush();
+    // mark pause after first checkpoint (5 files)
+    getEzManager().pauseForTestingAfterNthSubmission(1);
+    dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
+    waitForReencryptedFiles(zone.toString(), 5);
+    assertKeyVersionChanged(lastReencryptedFile, fei);
+    assertKeyVersionEquals(notReencrypted, feiLast);
+  }
+
+  @Test
+  public void testDeleteDuringReencrypt() throws Exception {
+    final int len = 8196;
+    final Path zoneParent = new Path("/zones");
+    final Path zone = new Path(zoneParent, "zone");
+    fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
+    dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
+    for (int i = 0; i < 10; ++i) {
+      DFSTestUtil
+          .createFile(fs, new Path(zone, Integer.toString(i)), len, (short) 1,
+              0xFEED);
+    }
+    // test zone deleted during re-encrypt
+    getEzManager().pauseReencryptForTesting();
+    getEzManager().resetMetricsForTesting();
+    dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
+    waitForQueuedZones(1);
+
+    fs.delete(zone, true);
+    getEzManager().resumeReencryptForTesting();
+    waitForTotalZones(0);
+    assertNull(getZoneStatus(zone.toString()));
+  }
+
+  @Test
+  public void testZoneDeleteDuringReencrypt() throws Exception {
+    final int len = 8196;
+    final Path zoneParent = new Path("/zones");
+    final Path zone = new Path(zoneParent, "zone");
+    fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
+    dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
+    for (int i = 0; i < 10; ++i) {
+      DFSTestUtil
+          .createFile(fs, new Path(zone, Integer.toString(i)), len, (short) 1,
+              0xFEED);
+    }
+
+    fsn.getProvider().rollNewVersion(TEST_KEY);
+    fsn.getProvider().flush();
+    // test zone deleted during re-encrypt's checkpointing
+    getEzManager().pauseForTestingAfterNthSubmission(1);
+    getEzManager().resetMetricsForTesting();
+    dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
+    waitForReencryptedFiles(zone.toString(), 5);
+
+    fs.delete(zoneParent, true);
+    getEzManager().resumeReencryptForTesting();
+    waitForTotalZones(0);
+    assertNull(getEzManager().getZoneStatus(zone.toString()));
+
+    // verify zone is cleared
+    RemoteIterator<ZoneReencryptionStatus> it =
+        dfsAdmin.listReencryptionStatus();
+    assertFalse(it.hasNext());
+  }
+
+  @Test
+  public void testRestartAfterReencrypt() throws Exception {
+    /* Setup dir as follows:
+     * /zones
+     * /zones/zone
+     * /zones/zone/[0-9]
+     * /zones/zone/dir
+     * /zones/zone/dir/f
+     */
+    final int len = 8196;
+    final Path zoneParent = new Path("/zones");
+    final Path zone = new Path(zoneParent, "zone");
+    fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
+    dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
+    for (int i = 0; i < 10; ++i) {
+      DFSTestUtil
+          .createFile(fs, new Path(zone, Integer.toString(i)), len, (short) 1,
+              0xFEED);
+    }
+    final Path subdir = new Path(zone, "dir");
+    fsWrapper.mkdir(subdir, FsPermission.getDirDefault(), true);
+    DFSTestUtil.createFile(fs, new Path(subdir, "f"), len, (short) 1, 0xFEED);
+
+    final Path encFile0 = new Path(zone, "0");
+    final Path encFile9 = new Path(zone, "9");
+    final FileEncryptionInfo fei0 = getFileEncryptionInfo(encFile0);
+    final FileEncryptionInfo fei9 = getFileEncryptionInfo(encFile9);
+    fsn.getProvider().rollNewVersion(TEST_KEY);
+    fsn.getProvider().flush();
+    dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
+    waitForReencryptedZones(1);
+
+    assertKeyVersionChanged(encFile0, fei0);
+    assertKeyVersionChanged(encFile9, fei9);
+
+    final FileEncryptionInfo fei0new = getFileEncryptionInfo(encFile0);
+    final FileEncryptionInfo fei9new = getFileEncryptionInfo(encFile9);
+    restartClusterDisableReencrypt();
+
+    assertKeyVersionEquals(encFile0, fei0new);
+    assertKeyVersionEquals(encFile9, fei9new);
+    assertNull("Re-encrypt queue should be empty after restart",
+        getReencryptionStatus().getNextUnprocessedZone());
+  }
+
+  @Test
+  public void testRestartWithRenames() throws Exception {
+    /* Setup dir as follows:
+     * /zones
+     * /zones/zone
+     * /zones/zone/f --> renamed to f1
+     */
+    final int len = 8196;
+    final Path zoneParent = new Path("/zones");
+    final Path zone = new Path(zoneParent, "zone");
+    fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
+    dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
+    DFSTestUtil.createFile(fs, new Path(zone, "f"), len, (short) 1, 0xFEED);
+    fsWrapper.rename(new Path(zone, "f"), new Path(zone, "f1"));
+
+    // re-encrypt
+    fsn.getProvider().rollNewVersion(TEST_KEY);
+    fsn.getProvider().flush();
+    dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
+    waitForReencryptedZones(1);
+
+    // make sure NN can successfully restart (rename can load ok with
+    // re-encrypt since they're in correct order)
+    cluster.restartNameNodes();
+    cluster.waitActive();
+
+    waitForReencryptedZones(1);
+  }
+
+  @Test
+  public void testRestartDuringReencrypt() throws Exception {
+    /* Setup dir as follows:
+     * /zones
+     * /zones/zone
+     * /zones/zone/dir_empty
+     * /zones/zone/dir1/[0-9]
+     * /zones/zone/dir1/dir_empty1
+     * /zones/zone/dir2
+     * /zones/zone/dir2/dir_empty2
+     * /zones/zone/dir2/f
+     */
+    final int len = 8196;
+    final Path zoneParent = new Path("/zones");
+    final Path zone = new Path(zoneParent, "zone");
+    fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
+    dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
+    fsWrapper
+        .mkdir(new Path(zone, "dir_empty"), FsPermission.getDirDefault(), true);
+    Path subdir = new Path(zone, "dir2");
+    fsWrapper.mkdir(subdir, FsPermission.getDirDefault(), true);
+    fsWrapper
+        .mkdir(new Path(subdir, "dir_empty2"), FsPermission.getDirDefault(),
+            true);
+    DFSTestUtil.createFile(fs, new Path(subdir, "f"), len, (short) 1, 0xFEED);
+    subdir = new Path(zone, "dir1");
+    for (int i = 0; i < 10; ++i) {
+      DFSTestUtil
+          .createFile(fs, new Path(subdir, Integer.toString(i)), len, (short) 1,
+              0xFEED);
+    }
+    fsWrapper
+        .mkdir(new Path(subdir, "dir_empty1"), FsPermission.getDirDefault(),
+            true);
+
+    final Path encFile0 = new Path(subdir, "0");
+    final Path encFile9 = new Path(subdir, "9");
+    final FileEncryptionInfo fei0 = getFileEncryptionInfo(encFile0);
+    final FileEncryptionInfo fei9 = getFileEncryptionInfo(encFile9);
+    fsn.getProvider().rollNewVersion(TEST_KEY);
+    fsn.getProvider().flush();
+    // mark pause after first checkpoint (5 files)
+    getEzManager().pauseForTestingAfterNthSubmission(1);
+    dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
+    waitForReencryptedFiles(zone.toString(), 5);
+
+    restartClusterDisableReencrypt();
+
+    final Long zoneId = fsn.getFSDirectory().getINode(zone.toString()).getId();
+    assertEquals("Re-encrypt should restore to the last checkpoint zone",
+        zoneId, getReencryptionStatus().getNextUnprocessedZone());
+    assertEquals("Re-encrypt should restore to the last checkpoint file",
+        new Path(subdir, "4").toString(),
+        getEzManager().getZoneStatus(zone.toString()).getLastCheckpointFile());
+
+    getEzManager().resumeReencryptForTesting();
+    waitForReencryptedZones(1);
+    assertKeyVersionChanged(encFile0, fei0);
+    assertKeyVersionChanged(encFile9, fei9);
+    assertNull("Re-encrypt queue should be empty after restart",
+        getReencryptionStatus().getNextUnprocessedZone());
+    assertEquals(11, getZoneStatus(zone.toString()).getFilesReencrypted());
+  }
+
+  @Test
+  public void testRestartAfterReencryptAndCheckpoint() throws Exception {
+    final int len = 8196;
+    final Path zoneParent = new Path("/zones");
+    final Path zone = new Path(zoneParent, "zone");
+    fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
+    dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
+    for (int i = 0; i < 10; ++i) {
+      DFSTestUtil
+          .createFile(fs, new Path(zone, Integer.toString(i)), len, (short) 1,
+              0xFEED);
+    }
+    final Path subdir = new Path(zone, "dir");
+    fsWrapper.mkdir(subdir, FsPermission.getDirDefault(), true);
+    DFSTestUtil.createFile(fs, new Path(subdir, "f"), len, (short) 1, 0xFEED);
+
+    final Path encFile0 = new Path(zone, "0");
+    final Path encFile9 = new Path(zone, "9");
+    final FileEncryptionInfo fei0 = getFileEncryptionInfo(encFile0);
+    final FileEncryptionInfo fei9 = getFileEncryptionInfo(encFile9);
+    fsn.getProvider().rollNewVersion(TEST_KEY);
+    fsn.getProvider().flush();
+    dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
+    waitForReencryptedZones(1);
+
+    assertKeyVersionChanged(encFile0, fei0);
+    assertKeyVersionChanged(encFile9, fei9);
+
+    final FileEncryptionInfo fei0new = getFileEncryptionInfo(encFile0);
+    final FileEncryptionInfo fei9new = getFileEncryptionInfo(encFile9);
+    fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+    fs.saveNamespace();
+    fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
+    restartClusterDisableReencrypt();
+
+    assertKeyVersionEquals(encFile0, fei0new);
+    assertKeyVersionEquals(encFile9, fei9new);
+    assertNull("Re-encrypt queue should be empty after restart",
+        getReencryptionStatus().getNextUnprocessedZone());
+  }
+
+  @Test
+  public void testReencryptLoadedFromEdits() throws Exception {
+    /*
+     * /zones/zone/[0-9]
+     * /zones/zone/dir/f
+     */
+    final int len = 8196;
+    final Path zoneParent = new Path("/zones");
+    final Path zone = new Path(zoneParent, "zone");
+    fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
+    dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
+    for (int i = 0; i < 10; ++i) {
+      DFSTestUtil
+          .createFile(fs, new Path(zone, Integer.toString(i)), len, (short) 1,
+              0xFEED);
+    }
+    final Path subdir = new Path(zone, "dir");
+    fsWrapper.mkdir(subdir, FsPermission.getDirDefault(), true);
+    DFSTestUtil.createFile(fs, new Path(subdir, "f"), len, (short) 1, 0xFEED);
+
+    final Path encFile0 = new Path(zone, "0");
+    final Path encFile9 = new Path(zone, "9");
+    final FileEncryptionInfo fei0 = getFileEncryptionInfo(encFile0);
+    final FileEncryptionInfo fei9 = getFileEncryptionInfo(encFile9);
+    fsn.getProvider().rollNewVersion(TEST_KEY);
+    fsn.getProvider().flush();
+    // disable re-encrypt for testing, and issue a command
+    getEzManager().pauseReencryptForTesting();
+    dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
+
+    // verify after restart the command is loaded
+    restartClusterDisableReencrypt();
+    waitForQueuedZones(1);
+
+    // Let the re-encrypt to start running.
+    getEzManager().resumeReencryptForTesting();
+    waitForReencryptedZones(1);
+    assertKeyVersionChanged(encFile0, fei0);
+    assertKeyVersionChanged(encFile9, fei9);
+
+    // verify status
+    verifyZoneStatus(zone, fei0, 11);
+  }
+
+  private void verifyZoneStatus(final Path zone, final FileEncryptionInfo fei,
+      final long expectedFiles) throws IOException {
+    RemoteIterator<ZoneReencryptionStatus> it =
+        dfsAdmin.listReencryptionStatus();
+    assertTrue(it.hasNext());
+    final ZoneReencryptionStatus zs = it.next();
+    assertEquals(zone.toString(), zs.getZoneName());
+    assertEquals(ZoneReencryptionStatus.State.Completed, zs.getState());
+    assertTrue(zs.getCompletionTime() > 0);
+    assertTrue(zs.getCompletionTime() > zs.getSubmissionTime());
+    if (fei != null) {
+      assertNotEquals(fei.getEzKeyVersionName(), zs.getEzKeyVersionName());
+    }
+    assertEquals(expectedFiles, zs.getFilesReencrypted());
+  }
+
+  @Test
+  public void testReencryptLoadedFromFsimage() throws Exception {
+    /*
+     * /zones/zone/[0-9]
+     * /zones/zone/dir/f
+     */
+    final int len = 8196;
+    final Path zoneParent = new Path("/zones");
+    final Path zone = new Path(zoneParent, "zone");
+    fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
+    dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
+    for (int i = 0; i < 10; ++i) {
+      DFSTestUtil
+          .createFile(fs, new Path(zone, Integer.toString(i)), len, (short) 1,
+              0xFEED);
+    }
+    final Path subdir = new Path(zone, "dir");
+    fsWrapper.mkdir(subdir, FsPermission.getDirDefault(), true);
+    DFSTestUtil.createFile(fs, new Path(subdir, "f"), len, (short) 1, 0xFEED);
+
+    final Path encFile0 = new Path(zone, "0");
+    final Path encFile9 = new Path(zone, "9");
+    final FileEncryptionInfo fei0 = getFileEncryptionInfo(encFile0);
+    final FileEncryptionInfo fei9 = getFileEncryptionInfo(encFile9);
+    fsn.getProvider().rollNewVersion(TEST_KEY);
+    fsn.getProvider().flush();
+    // disable re-encrypt for testing, and issue a command
+    getEzManager().pauseReencryptForTesting();
+    dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
+    waitForQueuedZones(1);
+
+    fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+    fs.saveNamespace();
+    fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
+
+    // verify after loading from fsimage the command is loaded
+    restartClusterDisableReencrypt();
+    waitForQueuedZones(1);
+
+    // Let the re-encrypt to start running.
+    getEzManager().resumeReencryptForTesting();
+    waitForReencryptedZones(1);
+    assertKeyVersionChanged(encFile0, fei0);
+    assertKeyVersionChanged(encFile9, fei9);
+
+    // verify status
+    verifyZoneStatus(zone, fei0, 11);
+  }
+
+  @Test
+  public void testReencryptCommandsQueuedOrdering() throws Exception {
+    final Path zoneParent = new Path("/zones");
+    final String zoneBaseName = zoneParent.toString() + "/zone";
+    final int numZones = 10;
+    for (int i = 0; i < numZones; ++i) {
+      final Path zone = new Path(zoneBaseName + i);
+      fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
+      dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
+    }
+
+    // Disable re-encrypt for testing, and issue commands
+    getEzManager().pauseReencryptForTesting();
+    for (int i = 0; i < numZones; ++i) {
+      dfsAdmin.reencryptEncryptionZone(new Path(zoneBaseName + i),
+          ReencryptAction.START);
+    }
+    waitForQueuedZones(numZones);
+
+    // Verify commands are queued in the same order submitted
+    ReencryptionStatus rzs = new ReencryptionStatus(getReencryptionStatus());
+    for (int i = 0; i < numZones; ++i) {
+      Long zoneId = fsn.getFSDirectory().getINode(zoneBaseName + i).getId();
+      assertEquals(zoneId, rzs.getNextUnprocessedZone());
+      rzs.removeZone(zoneId);
+    }
+
+    // Cancel some zones
+    Set<Integer> cancelled = new HashSet<>(Arrays.asList(0, 3, 4));
+    for (int cancel : cancelled) {
+      dfsAdmin.reencryptEncryptionZone(new Path(zoneBaseName + cancel),
+          ReencryptAction.CANCEL);
+    }
+
+    restartClusterDisableReencrypt();
+    waitForQueuedZones(numZones - cancelled.size());
+    rzs = new ReencryptionStatus(getReencryptionStatus());
+    for (int i = 0; i < numZones; ++i) {
+      if (cancelled.contains(i)) {
+        continue;
+      }
+      Long zoneId = fsn.getFSDirectory().getINode(zoneBaseName + i).getId();
+      assertEquals(zoneId, rzs.getNextUnprocessedZone());
+      rzs.removeZone(zoneId);
+    }
+
+    // Verify the same is true after loading from FSImage
+    fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+    fs.saveNamespace();
+    fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
+
+    restartClusterDisableReencrypt();
+    waitForQueuedZones(numZones - cancelled.size());
+    rzs = new ReencryptionStatus(getReencryptionStatus());
+    for (int i = 0; i < 10; ++i) {
+      if (cancelled.contains(i)) {
+        continue;
+      }
+      Long zoneId = fsn.getFSDirectory().getINode(zoneBaseName + i).getId();
+      assertEquals(zoneId, rzs.getNextUnprocessedZone());
+      rzs.removeZone(zoneId);
+    }
+  }
+
+  @Test
+  public void testReencryptNestedZones() throws Exception {
+    /* Setup dir as follows:
+     * / <- EZ
+     * /file
+     * /dir/dfile
+     * /level1  <- nested EZ
+     * /level1/fileL1-[0~2]
+     * /level1/level2/ <- nested EZ
+     * /level1/level2/fileL2-[0~3]
+     */
+    final int len = 8196;
+    final Path zoneRoot = new Path("/");
+    final Path zoneL1 = new Path(zoneRoot, "level1");
+    final Path zoneL2 = new Path(zoneL1, "level2");
+    final Path nonzoneDir = new Path(zoneRoot, "dir");
+    dfsAdmin.createEncryptionZone(zoneRoot, TEST_KEY, NO_TRASH);
+    DFSTestUtil
+        .createFile(fs, new Path(zoneRoot, "file"), len, (short) 1, 0xFEED);
+    DFSTestUtil
+        .createFile(fs, new Path(nonzoneDir, "dfile"), len, (short) 1, 0xFEED);
+    fsWrapper.mkdir(zoneL1, FsPermission.getDirDefault(), true);
+    dfsAdmin.createEncryptionZone(zoneL1, TEST_KEY, NO_TRASH);
+    for (int i = 0; i < 3; ++i) {
+      DFSTestUtil
+          .createFile(fs, new Path(zoneL1, "fileL1-" + i), len, (short) 1,
+              0xFEED);
+    }
+    fsWrapper.mkdir(zoneL2, FsPermission.getDirDefault(), true);
+    dfsAdmin.createEncryptionZone(zoneL2, TEST_KEY, NO_TRASH);
+    for (int i = 0; i < 4; ++i) {
+      DFSTestUtil
+          .createFile(fs, new Path(zoneL2, "fileL2-" + i), len, (short) 1,
+              0xFEED);
+    }
+
+    fsn.getProvider().rollNewVersion(TEST_KEY);
+    fsn.getProvider().flush();
+    // Disable re-encrypt, send re-encrypt on '/', verify queue
+    getEzManager().pauseReencryptForTesting();
+    dfsAdmin.reencryptEncryptionZone(zoneRoot, ReencryptAction.START);
+    waitForQueuedZones(1);
+    ReencryptionStatus rzs = getReencryptionStatus();
+    assertEquals(
+        (Long) fsn.getFSDirectory().getINode(zoneRoot.toString()).getId(),
+        rzs.getNextUnprocessedZone());
+
+    // Resume re-encrypt, verify files re-encrypted
+    getEzManager().resumeReencryptForTesting();
+    waitForZoneCompletes(zoneRoot.toString());
+    assertEquals(2, getZoneStatus(zoneRoot.toString()).getFilesReencrypted());
+
+    // Same tests on a child EZ.
+    getEzManager().resetMetricsForTesting();
+    getEzManager().pauseReencryptForTesting();
+    dfsAdmin.reencryptEncryptionZone(zoneL1, ReencryptAction.START);
+    waitForQueuedZones(1);
+    rzs = getReencryptionStatus();
+    assertEquals(
+        (Long) fsn.getFSDirectory().getINode(zoneL1.toString()).getId(),
+        rzs.getNextUnprocessedZone());
+
+    getEzManager().resumeReencryptForTesting();
+    waitForZoneCompletes(zoneL1.toString());
+    assertEquals(3, getZoneStatus(zoneL1.toString()).getFilesReencrypted());
+  }
+
+  @Test
+  public void testRaceCreateHandler() throws Exception {
+    /* Setup dir as follows:
+     * /dir/file[0~9]
+     */
+    final int len = 8196;
+    final Path zone = new Path("/dir");
+    fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
+    dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
+    int expected = 10;
+    for (int i = 0; i < 10; ++i) {
+      DFSTestUtil
+          .createFile(fs, new Path(zone, "file" + i), len, (short) 1, 0xFEED);
+    }
+
+    fsn.getProvider().rollNewVersion(TEST_KEY);
+    fsn.getProvider().flush();
+    // Issue the command re-encrypt and pause it
+    getEzManager().pauseReencryptForTesting();
+    dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
+    waitForQueuedZones(1);
+
+    // mark pause after first checkpoint (5 files)
+    getEzManager().pauseForTestingAfterNthSubmission(1);
+    // Resume the re-encrypt thread
+    getEzManager().resumeReencryptForTesting();
+    waitForReencryptedFiles(zone.toString(), 5);
+
+    /* creates the following:
+     * /dir/file8[0~5]
+     * /dir/dirsub/file[10-14]
+     * /dir/sub/file[15-19]
+     */
+    for (int i = 0; i < 6; ++i) {
+      DFSTestUtil
+          .createFile(fs, new Path(zone, "file8" + i), len, (short) 1, 0xFEED);
+    }
+    // we don't care newly created files since they should already use new edek.
+    // so naturally processes the listing from last checkpoint
+    final Path subdir = new Path(zone, "dirsub");
+    fsWrapper.mkdir(subdir, FsPermission.getDirDefault(), true);
+    for (int i = 10; i < 15; ++i) {
+      DFSTestUtil
+          .createFile(fs, new Path(subdir, "file" + i), len, (short) 1, 0xFEED);
+    }
+    // the above are created before checkpoint position, so not re-encrypted.
+    final Path sub = new Path(zone, "sub");
+    fsWrapper.mkdir(sub, FsPermission.getDirDefault(), true);
+    for (int i = 15; i < 20; ++i) {
+      DFSTestUtil
+          .createFile(fs, new Path(sub, "file" + i), len, (short) 1, 0xFEED);
+    }
+
+    // resume re-encrypt thread which was paused after first checkpoint
+    getEzManager().resumeReencryptForTesting();
+    waitForZoneCompletes(zone.toString());
+    assertEquals(expected,
+        getZoneStatus(zone.toString()).getFilesReencrypted());
+  }
+
+  @Test
+  public void testRaceDeleteHandler() throws Exception {
+    /* Setup dir as follows:
+     * /dir/file[0~9]
+     * /dir/subdir/file[10-14]
+     */
+    final int len = 8196;
+    final Path zone = new Path("/dir");
+    fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
+    dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
+    int expected = 15;
+    for (int i = 0; i < 10; ++i) {
+      DFSTestUtil
+          .createFile(fs, new Path(zone, "file" + i), len, (short) 1, 0xFEED);
+    }
+    final Path subdir = new Path(zone, "subdir");
+    fsWrapper.mkdir(subdir, FsPermission.getDirDefault(), true);
+    for (int i = 10; i < 15; ++i) {
+      DFSTestUtil
+          .createFile(fs, new Path(subdir, "file" + i), len, (short) 1, 0xFEED);
+    }
+
+    fsn.getProvider().rollNewVersion(TEST_KEY);
+    fsn.getProvider().flush();
+    // Issue the command re-encrypt and pause it
+    getEzManager().pauseReencryptForTesting();
+    dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
+    waitForQueuedZones(1);
+
+    // proceed to first checkpoint (5 files), delete files/subdir, then resume
+    getEzManager().pauseForTestingAfterNthSubmission(1);
+    getEzManager().resumeReencryptForTesting();
+    waitForReencryptedFiles(zone.toString(), 5);
+
+    fsWrapper.delete(new Path(zone, "file5"), true);
+    fsWrapper.delete(new Path(zone, "file8"), true);
+    expected -= 2;
+    fsWrapper.delete(subdir, true);
+    expected -= 5;
+
+    // resume re-encrypt thread which was paused after first checkpoint
+    getEzManager().resumeReencryptForTesting();
+    waitForZoneCompletes(zone.toString());
+    assertEquals(expected,
+        getZoneStatus(zone.toString()).getFilesReencrypted());
+  }
+
+  @Test
+  public void testRaceDeleteUpdater() throws Exception {
+    /* Setup dir as follows:
+     * /dir/file[0~9]
+     * /dir/subdir/file[10-14]
+     */
+    final int len = 8196;
+    final Path zone = new Path("/dir");
+    fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
+    dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
+    int expected = 15;
+    for (int i = 0; i < 10; ++i) {
+      DFSTestUtil
+          .createFile(fs, new Path(zone, "file" + i), len, (short) 1, 0xFEED);
+    }
+    final Path subdir = new Path(zone, "subdir");
+    fsWrapper.mkdir(subdir, FsPermission.getDirDefault(), true);
+    for (int i = 10; i < 15; ++i) {
+      DFSTestUtil
+          .createFile(fs, new Path(subdir, "file" + i), len, (short) 1, 0xFEED);
+    }
+
+    fsn.getProvider().rollNewVersion(TEST_KEY);
+    fsn.getProvider().flush();
+    // Issue the command re-encrypt and pause it
+    getEzManager().pauseReencryptForTesting();
+    dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
+    waitForQueuedZones(1);
+
+    // proceed to first checkpoint (5 files), delete files/subdir, then resume
+    getEzManager().pauseForTestingAfterNthCheckpoint(zone.toString(), 1);
+    getEzManager().pauseForTestingAfterNthSubmission(1);
+    getEzManager().resumeReencryptForTesting();
+
+    waitForReencryptedFiles(zone.toString(), 5);
+    getEzManager().resumeReencryptForTesting();
+
+    // give handler thread some time to process the files before deletion.
+    Thread.sleep(3000);
+    fsWrapper.delete(new Path(zone, "file5"), true);
+    fsWrapper.delete(new Path(zone, "file8"), true);
+    expected -= 2;
+    fsWrapper.delete(subdir, true);
+    expected -= 5;
+
+    // resume updater thread which was paused after first checkpoint, verify
+    // deleted files are skipped.
+    getEzManager().resumeReencryptUpdaterForTesting();
+    waitForZoneCompletes(zone.toString());
+    assertEquals(expected,
+        getZoneStatus(zone.toString()).getFilesReencrypted());
+  }
+
+  @Test
+  public void testRaceDeleteCurrentDirHandler() throws Exception {
+    /* Setup dir as follows:
+     * /dir/subdir/file[0~9]
+     * /dir/subdir2/file[10-14]
+     */
+    final int len = 8196;
+    final Path zone = new Path("/dir");
+    fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
+    dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
+    final Path subdir = new Path(zone, "subdir");
+    int expected = 15;
+    for (int i = 0; i < 10; ++i) {
+      DFSTestUtil
+          .createFile(fs, new Path(subdir, "file" + i), len, (short) 1, 0xFEED);
+    }
+    final Path subdir2 = new Path(zone, "subdir2");
+    fsWrapper.mkdir(subdir, FsPermission.getDirDefault(), true);
+    for (int i = 10; i < 15; ++i) {
+      DFSTestUtil.createFile(fs, new Path(subdir2, "file" + i), len, (short) 1,
+          0xFEED);
+    }
+
+    fsn.getProvider().rollNewVersion(TEST_KEY);
+    fsn.getProvider().flush();
+    // Issue the command re-encrypt and pause it
+    getEzManager().pauseReencryptForTesting();
+    dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
+    waitForQueuedZones(1);
+
+    // proceed to first checkpoint (5 files), delete subdir, then resume
+    getEzManager().pauseForTestingAfterNthSubmission(1);
+    getEzManager().resumeReencryptForTesting();
+    waitForReencryptedFiles(zone.toString(), 5);
+
+    fsWrapper.delete(subdir, true);
+    expected -= 5;
+
+    // resume re-encrypt thread which was paused after first checkpoint
+    getEzManager().resumeReencryptForTesting();
+    waitForZoneCompletes(zone.toString());
+    assertEquals(expected,
+        getZoneStatus(zone.toString()).getFilesReencrypted());
+  }
+
+  @Test
+  public void testRaceDeleteCurrentDirUpdater() throws Exception {
+    /* Setup dir as follows:
+     * /dir/subdir/file[0~9]
+     * /dir/subdir2/file[10-14]
+     */
+    final int len = 8196;
+    final Path zone = new Path("/dir");
+    fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
+    dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
+    final Path subdir = new Path(zone, "subdir");
+    int expected = 15;
+    for (int i = 0; i < 10; ++i) {
+      DFSTestUtil
+          .createFile(fs, new Path(subdir, "file" + i), len, (short) 1, 0xFEED);
+    }
+    final Path subdir2 = new Path(zone, "subdir2");
+    fsWrapper.mkdir(subdir, FsPermission.getDirDefault(), true);
+    for (int i = 10; i < 15; ++i) {
+      DFSTestUtil.createFile(fs, new Path(subdir2, "file" + i), len, (short) 1,
+          0xFEED);
+    }
+
+    fsn.getProvider().rollNewVersion(TEST_KEY);
+    fsn.getProvider().flush();
+    // Issue the command re-encrypt and pause it
+    getEzManager().pauseReencryptForTesting();
+    dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
+    waitForQueuedZones(1);
+
+    // proceed to first checkpoint (5 files), delete subdir, then resume
+    getEzManager().pauseForTestingAfterNthCheckpoint(zone.toString(), 1);
+    getEzManager().pauseForTestingAfterNthSubmission(1);
+    getEzManager().resumeReencryptForTesting();
+
+    waitForReencryptedFiles(zone.toString(), 5);
+    getEzManager().resumeReencryptForTesting();
+
+    // give handler thread some time to process the files before deletion.
+    Thread.sleep(3000);
+    fsWrapper.delete(subdir, true);
+    expected -= 5;
+
+    // resume updater thread which was paused after first checkpoint, verify
+    // deleted files are skipped.
+    getEzManager().resumeReencryptUpdaterForTesting();
+    waitForZoneCompletes(zone.toString());
+    assertEquals(expected,
+        getZoneStatus(zone.toString()).getFilesReencrypted());
+  }
+
+  @Test
+  public void testRaceDeleteZoneHandler() throws Exception {
+    /* Setup dir as follows:
+     * /dir/file[0~10]
+     */
+    final int len = 8196;
+    final Path zone = new Path("/dir");
+    fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
+    dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
+    for (int i = 0; i < 11; ++i) {
+      DFSTestUtil
+          .createFile(fs, new Path(zone, "file" + i), len, (short) 1, 0xFEED);
+    }
+
+    fsn.getProvider().rollNewVersion(TEST_KEY);
+    fsn.getProvider().flush();
+    // Issue the command re-encrypt and pause it
+    getEzManager().pauseReencryptForTesting();
+    dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
+    waitForQueuedZones(1);
+
+    // let both handler and updater pause, then delete zone.
+    getEzManager().pauseForTestingAfterNthSubmission(1);
+    getEzManager().pauseForTestingAfterNthCheckpoint(zone.toString(), 1);
+    getEzManager().resumeReencryptForTesting();
+    waitForReencryptedFiles(zone.toString(), 5);
+    getEzManager().pauseForTestingAfterNthSubmission(1);
+    getEzManager().resumeReencryptForTesting();
+
+    Thread.sleep(3000);
+    EncryptionZoneManager ezm = getEzManager();
+    ReencryptionHandler handler = (ReencryptionHandler) Whitebox
+        .getInternalState(ezm, "reencryptionHandler");
+    Map<Long, ZoneSubmissionTracker> tasks =
+        (Map<Long, ZoneSubmissionTracker>) Whitebox
+            .getInternalState(handler, "submissions");
+    List<Future> futures = new LinkedList<>();
+    for (ZoneSubmissionTracker zst : tasks.values()) {
+      for (Future f : zst.getTasks()) {
+        futures.add(f);
+      }
+    }
+    fsWrapper.delete(zone, true);
+    getEzManager().resumeReencryptForTesting();
+
+    // verify no running tasks
+    for (Future f : futures) {
+      assertTrue(f.isDone());
+    }
+
+    waitForTotalZones(0);
+  }
+
+  @Test
+  public void testRaceDeleteCreateHandler() throws Exception {
+    /* Setup dir as follows:
+     * /dir/file[0~9]
+     */
+    final int len = 8196;
+    final Path zone = new Path("/dir");
+    fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
+    dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
+    int expected = 10;
+    for (int i = 0; i < 10; ++i) {
+      DFSTestUtil
+          .createFile(fs, new Path(zone, "file" + i), len, (short) 1, 0xFEED);
+    }
+
+    fsn.getProvider().rollNewVersion(TEST_KEY);
+    fsn.getProvider().flush();
+    // Issue the command re-encrypt and pause it
+    getEzManager().pauseReencryptForTesting();
+    dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
+    waitForQueuedZones(1);
+
+    // mark pause after first checkpoint (5 files)
+    getEzManager().pauseForTestingAfterNthSubmission(1);
+    // Resume the re-encrypt thread
+    getEzManager().resumeReencryptForTesting();
+    waitForReencryptedFiles(zone.toString(), 5);
+
+    final Path recreated = new Path(zone, "file9");
+    fsWrapper.delete(recreated, true);
+    DFSTestUtil.createFile(fs, recreated, len, (short) 2, 0xFEED);
+    expected -= 1; // newly created files use new edek, no need to re-encrypt
+
+    // resume re-encrypt thread which was paused after first checkpoint
+    getEzManager().resumeReencryptForTesting();
+    waitForZoneCompletes(zone.toString());
+    assertEquals(expected,
+        getZoneStatus(zone.toString()).getFilesReencrypted());
+  }
+
+  @Test
+  public void testRaceDeleteCreateUpdater() throws Exception {
+    /* Setup dir as follows:
+     * /dir/file[0~9]
+     */
+    final int len = 8196;
+    final Path zone = new Path("/dir");
+    fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
+    dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
+    int expected = 10;
+    for (int i = 0; i < 10; ++i) {
+      DFSTestUtil
+          .createFile(fs, new Path(zone, "file" + i), len, (short) 1, 0xFEED);
+    }
+
+    fsn.getProvider().rollNewVersion(TEST_KEY);
+    fsn.getProvider().flush();
+    // Issue the command re-encrypt and pause it
+    getEzManager().pauseReencryptForTesting();
+    dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
+    waitForQueuedZones(1);
+
+    // mark pause after first checkpoint (5 files)
+    getEzManager().pauseForTestingAfterNthCheckpoint(zone.toString(), 1);
+    getEzManager().pauseForTestingAfterNthSubmission(1);
+    getEzManager().resumeReencryptForTesting();
+    waitForReencryptedFiles(zone.toString(), 5);
+    getEzManager().resumeReencryptForTesting();
+
+    // give handler thread some time to process the files before deletion.
+    Thread.sleep(3000);
+    final Path recreated = new Path(zone, "file9");
+    final FileEncryptionInfo feiOrig = getFileEncryptionInfo(recreated);
+    final String contentOrig = DFSTestUtil.readFile(fs, recreated);
+    fsWrapper.delete(recreated, true);
+    DFSTestUtil.createFile(fs, recreated, len, (short) 2, 0xFEED);
+    expected -= 1;
+
+    // resume updater thread which was paused after first checkpoint
+    getEzManager().resumeReencryptUpdaterForTesting();
+    waitForZoneCompletes(zone.toString());
+    assertEquals(expected,
+        getZoneStatus(zone.toString()).getFilesReencrypted());
+
+    // verify new file is using it's own edeks, with new keyversions,
+    // and can be decrypted correctly.
+    assertKeyVersionChanged(recreated, feiOrig);
+    final String content = DFSTestUtil.readFile(fs, recreated);
+    assertEquals(contentOrig, content);
+  }
+
+  // TODO: update test once HDFS-11203 is implemented.
+  @Test
+  public void testReencryptRaceRename() throws Exception {
+    /* Setup dir as follows:
+     * /dir/file[0~9]
+     * /dir/subdir/file[10-14]
+     */
+    final int len = 8196;
+    final Path zone = new Path("/dir");
+    fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
+    dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
+    for (int i = 0; i < 10; ++i) {
+      DFSTestUtil
+          .createFile(fs, new Path(zone, "file" + i), len, (short) 1, 0xFEED);
+    }
+    final Path subdir = new Path(zone, "subdir");
+    fsWrapper.mkdir(subdir, FsPermission.getDirDefault(), true);
+    for (int i = 10; i < 15; ++i) {
+      DFSTestUtil
+          .createFile(fs, new Path(subdir, "file" + i), len, (short) 1, 0xFEED);
+    }
+
+    fsn.getProvider().rollNewVersion(TEST_KEY);
+    fsn.getProvider().flush();
+    // Issue the command re-encrypt and pause it
+    getEzManager().pauseReencryptForTesting();
+    dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
+    waitForQueuedZones(1);
+
+    // mark pause after first checkpoint (5 files)
+    getEzManager().pauseForTestingAfterNthSubmission(1);
+    // Resume the re-encrypt thread
+    getEzManager().resumeReencryptForTesting();
+    waitForReencryptedFiles(zone.toString(), 5);
+
+    try {
+      fsWrapper.rename(new Path(zone, "file8"), new Path(zone, "file08"));
+      fail("rename a file in an EZ should be disabled");
+    } catch (IOException e) {
+      assertExceptionContains("under re-encryption", e);
+    }
+
+    // resume handler and pause updater, test again.
+    getEzManager().pauseReencryptUpdaterForTesting();
+    getEzManager().resumeReencryptForTesting();
+    try {
+      fsWrapper.rename(new Path(zone, "file8"), new Path(zone, "file08"));
+      fail("rename a file in an EZ should be disabled");
+    } catch (IOException e) {
+      assertExceptionContains("under re-encryption", e);
+    }
+  }
+
+  @Test
+  public void testReencryptSnapshots() throws Exception {
+    /* Setup test dir:
+     * /zones/zone/[0-9]
+     * /dir/f
+     *
+     * /zones/zone is snapshottable, and rename file 5 to 5new,
+      * 6 to 6new then delete (so the file is only referred from a snapshot).
+     */
+    final int len = 8196;
+    final Path zoneParent = new Path("/zones");
+    final Path zone = new Path(zoneParent, "zone");
+    fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
+    dfsAdmin.allowSnapshot(zone);
+    dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
+    for (int i = 0; i < 10; ++i) {
+      DFSTestUtil
+          .createFile(fs, new Path(zone, Integer.toString(i)), len, (short) 1,
+              0xFEED);
+    }
+    final Path subdir = new Path("/dir");
+    fsWrapper.mkdir(subdir, FsPermission.getDirDefault(), true);
+    DFSTestUtil.createFile(fs, new Path(subdir, "f"), len, (short) 1, 0xFEED);
+    // create a snapshot and rename a file, so INodeReference is created.
+    final Path zoneSnap = fs.createSnapshot(zone);
+    fsWrapper.rename(new Path(zone, "5"), new Path(zone, "5new"));
+    fsWrapper.rename(new Path(zone, "6"), new Path(zone, "6new"));
+    fsWrapper.delete(new Path(zone, "6new"), true);
+
+    // test re-encrypt on snapshot dir
+    final Path encFile1 = new Path(zone, "0");
+    final FileEncryptionInfo fei0 = getFileEncryptionInfo(encFile1);
+    fsn.getProvider().rollNewVersion(TEST_KEY);
+    fsn.getProvider().flush();
+    try {
+      dfsAdmin.reencryptEncryptionZone(zoneSnap, ReencryptAction.START);
+      fail("Reencrypt command on snapshot path should fail.");
+    } catch (RemoteException expected) {
+      LOG.info("Expected exception", expected);
+      assertTrue(expected
+          .unwrapRemoteException() instanceof SnapshotAccessControlException);
+    }
+    dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
+    waitForReencryptedZones(1);
+    waitForReencryptedFiles(zone.toString(), 9);
+    assertKeyVersionChanged(encFile1, fei0);
+  }
+
+  private void restartClusterDisableReencrypt() throws Exception {
+    cluster.restartNameNode(false);
+    fsn = cluster.getNamesystem();
+    getEzManager().pauseReencryptForTesting();
+    cluster.waitActive();
+  }
+
+  private void waitForReencryptedZones(final int expected)
+      throws TimeoutException, InterruptedException {
+    LOG.info("Waiting for re-encrypted zones to be {}", expected);
+    try {
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          return getReencryptionStatus().getNumZonesReencrypted() == expected;
+        }
+      }, 100, 10000);
+    } finally {
+      LOG.info("Re-encrypted zones = {} ",
+          getReencryptionStatus().getNumZonesReencrypted());
+    }
+  }
+
+  private void waitForQueuedZones(final int expected)
+      throws TimeoutException, InterruptedException {
+    LOG.info("Waiting for queued zones for re-encryption to be {}", expected);
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return getReencryptionStatus().zonesQueued() == expected;
+      }
+    }, 100, 10000);
+  }
+
+  private void waitForTotalZones(final int expected)
+      throws TimeoutException, InterruptedException {
+    LOG.info("Waiting for queued zones for re-encryption to be {}", expected);
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return getReencryptionStatus().zonesTotal() == expected;
+      }
+    }, 100, 10000);
+  }
+
+  private void waitForZoneCompletes(final String zone)
+      throws TimeoutException, InterruptedException {
+    LOG.info("Waiting for re-encryption zone {} to complete.", zone);
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        try {
+          return getZoneStatus(zone).getState()
+              == ZoneReencryptionStatus.State.Completed;
+        } catch (Exception ex) {
+          LOG.error("Exception caught", ex);
+          return false;
+        }
+      }
+    }, 100, 10000);
+  }
+
+  private EncryptionZoneManager getEzManager() {
+    return fsn.getFSDirectory().ezManager;
+  }
+
+  private ReencryptionStatus getReencryptionStatus() {
+    return getEzManager().getReencryptionStatus();
+  }
+
+  private ZoneReencryptionStatus getZoneStatus(final String zone)
+      throws IOException {
+    return getEzManager().getZoneStatus(zone);
+  }
+
+  private void waitForReencryptedFiles(final String zone, final int expected)
+      throws TimeoutException, InterruptedException {
+    LOG.info("Waiting for total re-encrypted file count to be {}", expected);
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        try {
+          return getZoneStatus(zone).getFilesReencrypted() == expected;
+        } catch (IOException e) {
+          return false;
+        }
+      }
+    }, 100, 10000);
+  }
+
+  private void assertKeyVersionChanged(final Path file,
+      final FileEncryptionInfo original) throws Exception {
+    final FileEncryptionInfo actual = getFileEncryptionInfo(file);
+    assertNotEquals("KeyVersion should be different",
+        original.getEzKeyVersionName(), actual.getEzKeyVersionName());
+  }
+
+  private void assertKeyVersionEquals(final Path file,
+      final FileEncryptionInfo expected) throws Exception {
+    final FileEncryptionInfo actual = getFileEncryptionInfo(file);
+    assertEquals("KeyVersion should be the same",
+        expected.getEzKeyVersionName(), actual.getEzKeyVersionName());
+  }
+
+  @Test
+  public void testReencryptCancel() throws Exception {
+    /* Setup test dir:
+     * /zones/zone/[0-9]
+     * /dir/f
+     */
+    final int len = 8196;
+    final Path zoneParent = new Path("/zones");
+    final Path zone = new Path(zoneParent, "zone");
+    fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
+    dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
+    for (int i = 0; i < 10; ++i) {
+      DFSTestUtil
+          .createFile(fs, new Path(zone, Integer.toString(i)), len, (short) 1,
+              0xFEED);
+    }
+    final Path subdir = new Path("/dir");
+    fsWrapper.mkdir(subdir, FsPermission.getDirDefault(), true);
+    DFSTestUtil.createFile(fs, new Path(subdir, "f"), len, (short) 1, 0xFEED);
+
+    fsn.getProvider().rollNewVersion(TEST_KEY);
+    fsn.getProvider().flush();
+    // disable, test basic
+    getEzManager().pauseReencryptForTesting();
+    dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
+    waitForQueuedZones(1);
+
+    dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.CANCEL);
+    getEzManager().resumeReencryptForTesting();
+    waitForZoneCompletes(zone.toString());
+    assertEquals(0, getZoneStatus(zone.toString()).getFilesReencrypted());
+
+    // test same command resubmission
+    try {
+      dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.CANCEL);
+    } catch (RemoteException expected) {
+      assertExceptionContains("not under re-encryption", expected);
+    }
+
+    fsn.getProvider().rollNewVersion(TEST_KEY);
+    fsn.getProvider().flush();
+    // test cancelling half-way
+    getEzManager().pauseForTestingAfterNthSubmission(1);
+    getEzManager().resumeReencryptForTesting();
+    dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
+    waitForReencryptedFiles(zone.toString(), 5);
+    dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.CANCEL);
+    getEzManager().resumeReencryptForTesting();
+    waitForZoneCompletes(zone.toString());
+    assertEquals(5, getZoneStatus(zone.toString()).getFilesReencrypted());
+    assertNull(
+        getEzManager().getZoneStatus(zone.toString()).getLastCheckpointFile());
+    assertNull(getReencryptionStatus().getNextUnprocessedZone());
+
+    // test cancelling non-EZ dir
+    try {
+      dfsAdmin.reencryptEncryptionZone(subdir, ReencryptAction.CANCEL);
+      fail("Re-encrypting non-EZ should fail");
+    } catch (RemoteException expected) {
+      LOG.info("Expected exception caught.", expected);
+      assertExceptionContains("not the root of an encryption zone", expected);
+    }
+
+    // test cancelling non-existing dir
+    try {
+      dfsAdmin.reencryptEncryptionZone(new Path(zone, "notexist"),
+          ReencryptAction.CANCEL);
+      fail("Re-encrypting non-existing dir should fail");
+    } catch (RemoteException expected) {
+      LOG.info("Expected exception caught.", expected);
+      assertTrue(
+          expected.unwrapRemoteException() instanceof FileNotFoundException);
+    }
+
+    // test cancelling directly on a EZ file
+    final Path encFile = new Path(zone, "0");
+    try {
+      dfsAdmin.reencryptEncryptionZone(encFile, ReencryptAction.CANCEL);
+      fail("Re-encrypting on a file should fail");
+    } catch (RemoteException expected) {
+      LOG.info("Expected exception caught.", expected);
+      assertExceptionContains("not the root of an encryption zone", expected);
+    }
+
+    // final check - should only had 5 files re-encrypted overall.
+    assertEquals(5, getZoneStatus(zone.toString()).getFilesReencrypted());
+  }
+
+  @Test
+  public void testReencryptCancelForUpdater() throws Exception {
+    /* Setup test dir:
+     * /zones/zone/[0-9]
+     * /dir/f
+     */
+    final int len = 8196;
+    final Path zoneParent = new Path("/zones");
+    final Path zone = new Path(zoneParent, "zone");
+    fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
+    dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
+    for (int i = 0; i < 10; ++i) {
+      DFSTestUtil
+          .createFile(fs, new Path(zone, Integer.toString(i)), len, (short) 1,
+              0xFEED);
+    }
+    final Path subdir = new Path("/dir");
+    fsWrapper.mkdir(subdir, FsPermission.getDirDefault(), true);
+    DFSTestUtil.createFile(fs, new Path(subdir, "f"), len, (short) 1, 0xFEED);
+
+    fsn.getProvider().rollNewVersion(TEST_KEY);
+    fsn.getProvider().flush();
+    // disable, test basic
+    getEzManager().pauseReencryptUpdaterForTesting();
+    dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
+    Thread.sleep(3000);
+
+    dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.CANCEL);
+    getEzManager().resumeReencryptUpdaterForTesting();
+    waitForZoneCompletes(zone.toString());
+    Thread.sleep(3000);
+    assertEquals(0, getZoneStatus(zone.toString()).getFilesReencrypted());
+
+  }
+
+  @Test
+  public void testReencryptionWithoutProvider() throws Exception {
+    /* Setup test dir:
+     * /zones/zone/[0-9]
+     */
+    final int len = 8196;
+    final Path zoneParent = new Path("/zones");
+    final Path zone = new Path(zoneParent, "zone");
+    fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
+    dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
+    for (int i = 0; i < 10; ++i) {
+      DFSTestUtil
+          .createFile(fs, new Path(zone, Integer.toString(i)), len, (short) 1,
+              0xFEED);
+    }
+
+    // re-encrypt the zone
+    fsn.getProvider().rollNewVersion(TEST_KEY);
+    fsn.getProvider().flush();
+    dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
+    waitForReencryptedZones(1);
+
+    // start NN without providers
+    cluster.getConfiguration(0)
+        .unset(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH);
+    cluster.restartNameNodes();
+    cluster.waitActive();
+
+    // test re-encrypt should fail
+    try {
+      dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
+      fail("should not be able to re-encrypt");
+    } catch (RemoteException expected) {
+      assertExceptionContains("rejected", expected.unwrapRemoteException());
+    }
+    try {
+      dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.CANCEL);
+      fail("should not be able to cancel re-encrypt");
+    } catch (RemoteException expected) {
+      assertExceptionContains("rejected", expected.unwrapRemoteException());
+    }
+
+    // test listReencryptionStatus should still work
+    RemoteIterator<ZoneReencryptionStatus> it =
+        dfsAdmin.listReencryptionStatus();
+    assertTrue(it.hasNext());
+    ZoneReencryptionStatus zs = it.next();
+    assertEquals(zone.toString(), zs.getZoneName());
+    assertEquals(ZoneReencryptionStatus.State.Completed, zs.getState());
+    assertTrue(zs.getCompletionTime() > 0);
+    assertTrue(zs.getCompletionTime() > zs.getSubmissionTime());
+    assertEquals(10, zs.getFilesReencrypted());
+  }
+
+  @Test
+  public void testReencryptionNNSafeMode() throws Exception {
+    /* Setup test dir:
+     * /zones/zone/[0-9]
+     */
+    final int len = 8196;
+    final Path zoneParent = new Path("/zones");
+    final Path zone = new Path(zoneParent, "zone");
+    fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
+    dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
+    for (int i = 0; i < 10; ++i) {
+      DFSTestUtil
+          .createFile(fs, new Path(zone, Integer.toString(i)), len, (short) 1,
+              0xFEED);
+    }
+
+    fsn.getProvider().rollNewVersion(TEST_KEY);
+    fsn.getProvider().flush();
+    // mark pause after first checkpoint (5 files)
+    getEzManager().pauseForTestingAfterNthSubmission(1);
+    dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
+    waitForReencryptedFiles(zone.toString(), 5);
+
+    fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+    getEzManager().resumeReencryptForTesting();
+    for (int i = 0; i < 3; ++i) {
+      Thread.sleep(1000);
+      RemoteIterator<ZoneReencryptionStatus> it =
+          dfsAdmin.listReencryptionStatus();
+      assertTrue(it.hasNext());
+      ZoneReencryptionStatus zs = it.next();
+      assertEquals(zone.toString(), zs.getZoneName());
+      assertEquals(0, zs.getCompletionTime());
+      assertEquals(5, zs.getFilesReencrypted());
+    }
+
+    fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
+    waitForReencryptedFiles(zone.toString(), 10);
+  }
+
+  @Test
+  public void testReencryptionKMSDown() throws Exception {
+    class MyInjector extends EncryptionFaultInjector {
+      private volatile int exceptionCount = 0;
+
+      MyInjector(int numFailures) {
+        exceptionCount = numFailures;
+      }
+
+      @Override
+      public void reencryptEncryptedKeys() throws IOException {
+        if (exceptionCount > 0) {
+          --exceptionCount;
+          throw new IOException("Injected KMS failure");
+        }
+      }
+    }
+    final MyInjector injector = new MyInjector(1);
+    EncryptionFaultInjector.instance = injector;
+    /* Setup test dir:
+     * /zones/zone/[0-9]
+     */
+    final int len = 8196;
+    final Path zoneParent = new Path("/zones");
+    final Path zone = new Path(zoneParent, "zone");
+    fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
+    dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
+    for (int i = 0; i < 10; ++i) {
+      DFSTestUtil
+          .createFile(fs, new Path(zone, Integer.toString(i)), len, (short) 1,
+              0xFEED);
+    }
+
+    // re-encrypt the zone
+    fsn.getProvider().rollNewVersion(TEST_KEY);
+    fsn.getProvider().flush();
+
+    dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
+    waitForReencryptedZones(1);
+    assertEquals(0, injector.exceptionCount);
+
+    // test listReencryptionStatus should still work
+    RemoteIterator<ZoneReencryptionStatus> it =
+        dfsAdmin.listReencryptionStatus();
+    assertTrue(it.hasNext());
+    ZoneReencryptionStatus zs = it.next();
+    assertEquals(zone.toString(), zs.getZoneName());
+    assertEquals(ZoneReencryptionStatus.State.Completed, zs.getState());
+    assertTrue(zs.getCompletionTime() > 0);
+    assertTrue(zs.getCompletionTime() > zs.getSubmissionTime());
+    assertEquals(5, zs.getFilesReencrypted());
+    assertEquals(5, zs.getNumReencryptionFailures());
+  }
+
+  @Test
+  public void testReencryptionUpdaterFaultOneTask() throws Exception {
+    class MyInjector extends EncryptionFaultInjector {
+      private volatile int exceptionCount = 0;
+
+      MyInjector(int numFailures) {
+        exceptionCount = numFailures;
+      }
+
+      @Override
+      public void reencryptUpdaterProcessOneTask() throws IOException {
+        if (exceptionCount > 0) {
+          --exceptionCount;
+          throw new IOException("Injected process task failure");
+        }
+      }
+    }
+    final MyInjector injector = new MyInjector(1);
+    EncryptionFaultInjector.instance = injector;
+    /* Setup test dir:
+     * /zones/zone/[0-9]
+     */
+    final int len = 8196;
+    final Path zoneParent = new Path("/zones");
+    final Path zone = new Path(zoneParent, "zone");
+    fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
+    dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
+    for (int i = 0; i < 10; ++i) {
+      DFSTestUtil
+          .createFile(fs, new Path(zone, Integer.toString(i)), len, (short) 1,
+              0xFEED);
+    }
+
+    // re-encrypt the zone
+    fsn.getProvider().rollNewVersion(TEST_KEY);
+    fsn.getProvider().flush();
+
+    dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
+    waitForReencryptedZones(1);
+    assertEquals(0, injector.exceptionCount);
+
+    // test listReencryptionStatus should still work
+    RemoteIterator<ZoneReencryptionStatus> it =
+        dfsAdmin.listReencryptionStatus();
+    assertTrue(it.hasNext());
+    ZoneReencryptionStatus zs = it.next();
+    assertEquals(zone.toString(), zs.getZoneName());
+    assertEquals(ZoneReencryptionStatus.State.Completed, zs.getState());
+    assertTrue(zs.getCompletionTime() > 0);
+    assertTrue(zs.getCompletionTime() > zs.getSubmissionTime());
+    assertEquals(5, zs.getFilesReencrypted());
+    assertEquals(1, zs.getNumReencryptionFailures());
+  }
+
+
+  @Test
+  public void testReencryptionUpdaterFaultCkpt() throws Exception {
+    class MyInjector extends EncryptionFaultInjector {
+      private volatile int exceptionCount = 0;
+
+      MyInjector(int numFailures) {
+        exceptionCount = numFailures;
+      }
+
+      @Override
+      public void reencryptUpdaterProcessCheckpoint() throws IOException {
+        if (exceptionCount > 0) {
+          --exceptionCount;
+          throw new IOException("Injected process checkpoint failure");
+        }
+      }
+    }
+    final MyInjector injector = new MyInjector(1);
+    EncryptionFaultInjector.instance = injector;
+    /* Setup test dir:
+     * /zones/zone/[0-9]
+     */
+    final int len = 8196;
+    final Path zoneParent = new Path("/zones");
+    final Path zone = new Path(zoneParent, "zone");
+    fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
+    dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
+    for (int i = 0; i < 10; ++i) {
+      DFSTestUtil
+          .createFile(fs, new Path(zone, Integer.toString(i)), len, (short) 1,
+              0xFEED);
+    }
+
+    // re-encrypt the zone
+    fsn.getProvider().rollNewVersion(TEST_KEY);
+    fsn.getProvider().flush();
+
+    dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
+    waitForReencryptedZones(1);
+    assertEquals(0, injector.exceptionCount);
+
+    // test listReencryptionStatus should still work
+    RemoteIterator<ZoneReencryptionStatus> it =
+        dfsAdmin.listReencryptionStatus();
+    assertTrue(it.hasNext());
+    ZoneReencryptionStatus zs = it.next();
+    assertEquals(zone.toString(), zs.getZoneName());
+    assertEquals(ZoneReencryptionStatus.State.Completed, zs.getState());
+    assertTrue(zs.getCompletionTime() > 0);
+    assertTrue(zs.getCompletionTime() > zs.getSubmissionTime());
+    assertEquals(10, zs.getFilesReencrypted());
+    assertEquals(1, zs.getNumReencryptionFailures());
+  }
+
+  @Test
+  public void testReencryptionUpdaterFaultRecover() throws Exception {
+    class MyInjector extends EncryptionFaultInjector {
+      private volatile int exceptionCount = 0;
+
+      MyInjector(int oneTask) {
+        exceptionCount = oneTask;
+      }
+
+      @Override
+      public void reencryptUpdaterProcessOneTask() throws IOException {
+        if (exceptionCount > 0) {
+          --exceptionCount;
+          throw new RetriableException("Injected process task failure");
+        }
+      }
+    }
+    final MyInjector injector = new MyInjector(10);
+    EncryptionFaultInjector.instance = injector;
+    /* Setup test dir:
+     * /zones/zone/[0-9]
+     */
+    final int len = 8196;
+    final Path zoneParent = new Path("/zones");
+    final Path zone = new Path(zoneParent, "zone");
+    fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
+    dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
+    for (int i = 0; i < 10; ++i) {
+      DFSTestUtil
+          .createFile(fs, new Path(zone, Integer.toString(i)), len, (short) 1,
+              0xFEED);
+    }
+
+    // re-encrypt the zone
+    fsn.getProvider().rollNewVersion(TEST_KEY);
+    fsn.getProvider().flush();
+
+    final EncryptionZoneManager ezm = getEzManager();
+    final ReencryptionHandler handler = (ReencryptionHandler) Whitebox
+        .getInternalState(ezm, "reencryptionHandler");
+    final ReencryptionUpdater updater = (ReencryptionUpdater) Whitebox
+        .getInternalState(handler, "reencryptionUpdater");
+    Whitebox.setInternalState(updater, "faultRetryInterval", 50);
+    dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
+    waitForReencryptedZones(1);
+    assertEquals(0, injector.exceptionCount);
+
+    // test listReencryptionStatus should still work
+    RemoteIterator<ZoneReencryptionStatus> it =
+        dfsAdmin.listReencryptionStatus();
+    assertTrue(it.hasNext());
+    ZoneReencryptionStatus zs = it.next();
+    assertEquals(zone.toString(), zs.getZoneName());
+    assertEquals(ZoneReencryptionStatus.State.Completed, zs.getState());
+    assertTrue(zs.getCompletionTime() > 0);
+    assertTrue(zs.getCompletionTime() > zs.getSubmissionTime());
+    assertEquals(10, zs.getFilesReencrypted());
+    assertEquals(0, zs.getNumReencryptionFailures());
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[4/4] hadoop git commit: HDFS-10899. Add functionality to re-encrypt EDEKs.

Posted by xi...@apache.org.
HDFS-10899. Add functionality to re-encrypt EDEKs.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1000a2af
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1000a2af
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1000a2af

Branch: refs/heads/trunk
Commit: 1000a2af04b24c123a3b08168f36b4e90420cab7
Parents: 26d8c8f
Author: Xiao Chen <xi...@apache.org>
Authored: Wed Aug 23 17:05:47 2017 -0700
Committer: Xiao Chen <xi...@apache.org>
Committed: Wed Aug 23 17:06:16 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hdfs/DFSClient.java  |   20 +
 .../hadoop/hdfs/DistributedFileSystem.java      |   34 +
 .../apache/hadoop/hdfs/client/HdfsAdmin.java    |   29 +
 .../hadoop/hdfs/protocol/ClientProtocol.java    |   25 +
 .../hadoop/hdfs/protocol/HdfsConstants.java     |    7 +
 .../hdfs/protocol/ReencryptionStatus.java       |  216 ++
 .../protocol/ReencryptionStatusIterator.java    |   58 +
 .../hdfs/protocol/ZoneReencryptionStatus.java   |  257 +++
 .../ClientNamenodeProtocolTranslatorPB.java     |   39 +
 .../hadoop/hdfs/protocolPB/PBHelperClient.java  |  136 +-
 .../src/main/proto/ClientNamenodeProtocol.proto |    4 +
 .../src/main/proto/encryption.proto             |   41 +
 .../src/main/proto/hdfs.proto                   |   14 +
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   12 +
 ...tNamenodeProtocolServerSideTranslatorPB.java |   36 +
 .../namenode/EncryptionFaultInjector.java       |    9 +
 .../server/namenode/EncryptionZoneManager.java  |  351 +++-
 .../server/namenode/FSDirEncryptionZoneOp.java  |  238 ++-
 .../hdfs/server/namenode/FSDirWriteFileOp.java  |    4 +-
 .../hdfs/server/namenode/FSDirXAttrOp.java      |    7 +
 .../hdfs/server/namenode/FSDirectory.java       |   11 +-
 .../hdfs/server/namenode/FSNamesystem.java      |   86 +
 .../hdfs/server/namenode/NameNodeRpcServer.java |   27 +
 .../server/namenode/ReencryptionHandler.java    |  940 +++++++++
 .../server/namenode/ReencryptionUpdater.java    |  523 +++++
 .../apache/hadoop/hdfs/tools/CryptoAdmin.java   |  134 +-
 .../src/main/resources/hdfs-default.xml         |   52 +
 .../src/site/markdown/TransparentEncryption.md  |   45 +-
 .../hdfs/server/namenode/TestReencryption.java  | 1847 ++++++++++++++++++
 .../namenode/TestReencryptionHandler.java       |  197 ++
 .../src/test/resources/testCryptoConf.xml       |   80 +
 31 files changed, 5443 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 47c14e2..9239df3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -121,6 +121,7 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -131,11 +132,13 @@ import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
 import org.apache.hadoop.hdfs.protocol.OpenFilesIterator;
 import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
+import org.apache.hadoop.hdfs.protocol.ReencryptionStatusIterator;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
 import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
+import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
@@ -2639,6 +2642,23 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     return new EncryptionZoneIterator(namenode, tracer);
   }
 
+  public void reencryptEncryptionZone(String zone, ReencryptAction action)
+      throws IOException {
+    checkOpen();
+    try (TraceScope ignored = newPathTraceScope("reencryptEncryptionZone",
+        zone)) {
+      namenode.reencryptEncryptionZone(zone, action);
+    } catch (RemoteException re) {
+      throw re.unwrapRemoteException(AccessControlException.class,
+          SafeModeException.class, UnresolvedPathException.class);
+    }
+  }
+
+  public RemoteIterator<ZoneReencryptionStatus> listReencryptionStatus()
+      throws IOException {
+    checkOpen();
+    return new ReencryptionStatusIterator(namenode, tracer);
+  }
 
   public void setErasureCodingPolicy(String src, String ecPolicyName)
       throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index ceec2b3..f3605fa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -83,11 +83,13 @@ import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
 import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
+import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
@@ -2314,6 +2316,38 @@ public class DistributedFileSystem extends FileSystem {
   }
 
   /* HDFS only */
+  public void reencryptEncryptionZone(final Path zone,
+      final ReencryptAction action) throws IOException {
+    final Path absF = fixRelativePart(zone);
+    new FileSystemLinkResolver<Void>() {
+      @Override
+      public Void doCall(final Path p) throws IOException {
+        dfs.reencryptEncryptionZone(getPathName(p), action);
+        return null;
+      }
+
+      @Override
+      public Void next(final FileSystem fs, final Path p) throws IOException {
+        if (fs instanceof DistributedFileSystem) {
+          DistributedFileSystem myDfs = (DistributedFileSystem) fs;
+          myDfs.reencryptEncryptionZone(p, action);
+          return null;
+        }
+        throw new UnsupportedOperationException(
+            "Cannot call reencryptEncryptionZone"
+                + " on a symlink to a non-DistributedFileSystem: " + zone
+                + " -> " + p);
+      }
+    }.resolve(this, absF);
+  }
+
+  /* HDFS only */
+  public RemoteIterator<ZoneReencryptionStatus> listReencryptionStatus()
+      throws IOException {
+    return dfs.listReencryptionStatus();
+  }
+
+  /* HDFS only */
   public FileEncryptionInfo getFileEncryptionInfo(final Path path)
       throws IOException {
     Path absF = fixRelativePart(path);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java
index abf341e..85a7efe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java
@@ -49,6 +49,8 @@ import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
+import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
 import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
 import org.apache.hadoop.security.AccessControlException;
 
@@ -370,6 +372,33 @@ public class HdfsAdmin {
   }
 
   /**
+   * Performs re-encryption action for a given encryption zone.
+   *
+   * @param zone the root of the encryption zone
+   * @param action the re-encrypt action
+   * @throws IOException If any error occurs when handling re-encrypt action.
+   */
+  public void reencryptEncryptionZone(final Path zone,
+      final ReencryptAction action) throws IOException {
+    dfs.reencryptEncryptionZone(zone, action);
+  }
+
+  /**
+   * Returns a RemoteIterator which can be used to list all re-encryption
+   * information. For large numbers of re-encryptions, the iterator will fetch
+   * the list in a number of small batches.
+   * <p>
+   * Since the list is fetched in batches, it does not represent a
+   * consistent snapshot of the entire list of encryption zones.
+   * <p>
+   * This method can only be called by HDFS superusers.
+   */
+  public RemoteIterator<ZoneReencryptionStatus> listReencryptionStatus()
+      throws IOException {
+    return dfs.listReencryptionStatus();
+  }
+
+  /**
    * Returns the FileEncryptionInfo on the HdfsFileStatus for the given path.
    * The return value can be null if the path points to a directory, or a file
    * that is not in an encryption zone.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
index b0e85e5..b550467 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.inotify.EventBatchList;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@@ -1444,6 +1445,30 @@ public interface ClientProtocol {
       long prevId) throws IOException;
 
   /**
+   * Used to implement re-encryption of encryption zones.
+   *
+   * @param zone the encryption zone to re-encrypt.
+   * @param action the action for the re-encryption.
+   * @throws IOException
+   */
+  @AtMostOnce
+  void reencryptEncryptionZone(String zone, ReencryptAction action)
+      throws IOException;
+
+  /**
+   * Used to implement cursor-based batched listing of
+   * {@ZoneReencryptionStatus}s.
+   *
+   * @param prevId ID of the last item in the previous batch. If there is no
+   *               previous batch, a negative value can be used.
+   * @return Batch of encryption zones.
+   * @throws IOException
+   */
+  @Idempotent
+  BatchedEntries<ZoneReencryptionStatus> listReencryptionStatus(long prevId)
+      throws IOException;
+
+  /**
    * Set xattr of a file or directory.
    * The name must be prefixed with the namespace followed by ".". For example,
    * "user.attr".

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
index 2681f12..8c44293 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
@@ -144,6 +144,13 @@ public final class HdfsConstants {
     ALL, LIVE, DEAD, DECOMMISSIONING, ENTERING_MAINTENANCE, IN_MAINTENANCE
   }
 
+  /**
+   * Re-encrypt encryption zone actions.
+   */
+  public enum ReencryptAction {
+    CANCEL, START
+  }
+
   /* Hidden constructor */
   protected HdfsConstants() {
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ReencryptionStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ReencryptionStatus.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ReencryptionStatus.java
new file mode 100644
index 0000000..e83ab52
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ReencryptionStatus.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReencryptionInfoProto;
+import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus.State;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+/**
+ * A class representing information about re-encrypting encryption zones. It
+ * contains a collection of @{code ZoneReencryptionStatus} for each EZ.
+ * <p>
+ * FSDirectory lock is used for synchronization (except test-only methods, which
+ * are not protected).
+ */
+@InterfaceAudience.Private
+public final class ReencryptionStatus {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(ReencryptionStatus.class);
+
+  public static final BatchedListEntries<ZoneReencryptionStatus> EMPTY_LIST =
+      new BatchedListEntries<>(Lists.newArrayList(), false);
+
+  /**
+   * The zones that were submitted for re-encryption. This should preserve
+   * the order of submission.
+   */
+  private final TreeMap<Long, ZoneReencryptionStatus> zoneStatuses;
+  // Metrics
+  private long zonesReencrypted;
+
+  public ReencryptionStatus() {
+    zoneStatuses = new TreeMap<>();
+  }
+
+  @VisibleForTesting
+  public ReencryptionStatus(ReencryptionStatus rhs) {
+    if (rhs != null) {
+      this.zoneStatuses = new TreeMap<>(rhs.zoneStatuses);
+      this.zonesReencrypted = rhs.zonesReencrypted;
+    } else {
+      zoneStatuses = new TreeMap<>();
+    }
+  }
+
+  @VisibleForTesting
+  public void resetMetrics() {
+    zonesReencrypted = 0;
+    for (Map.Entry<Long, ZoneReencryptionStatus> entry : zoneStatuses
+        .entrySet()) {
+      entry.getValue().resetMetrics();
+    }
+  }
+
+  public ZoneReencryptionStatus getZoneStatus(final Long zondId) {
+    return zoneStatuses.get(zondId);
+  }
+
+  public void markZoneForRetry(final Long zoneId) {
+    final ZoneReencryptionStatus zs = zoneStatuses.get(zoneId);
+    Preconditions.checkNotNull(zs, "Cannot find zone " + zoneId);
+    LOG.info("Zone {} will retry re-encryption", zoneId);
+    zs.setState(State.Submitted);
+  }
+
+  public void markZoneStarted(final Long zoneId) {
+    final ZoneReencryptionStatus zs = zoneStatuses.get(zoneId);
+    Preconditions.checkNotNull(zs, "Cannot find zone " + zoneId);
+    LOG.info("Zone {} starts re-encryption processing", zoneId);
+    zs.setState(State.Processing);
+  }
+
+  public void markZoneCompleted(final Long zoneId) {
+    final ZoneReencryptionStatus zs = zoneStatuses.get(zoneId);
+    Preconditions.checkNotNull(zs, "Cannot find zone " + zoneId);
+    LOG.info("Zone {} completed re-encryption.", zoneId);
+    zs.setState(State.Completed);
+    zonesReencrypted++;
+  }
+
+  public Long getNextUnprocessedZone() {
+    for (Map.Entry<Long, ZoneReencryptionStatus> entry : zoneStatuses
+        .entrySet()) {
+      if (entry.getValue().getState() == State.Submitted) {
+        return entry.getKey();
+      }
+    }
+    return null;
+  }
+
+  public boolean hasRunningZone(final Long zoneId) {
+    return zoneStatuses.containsKey(zoneId)
+        && zoneStatuses.get(zoneId).getState() != State.Completed;
+  }
+
+  /**
+   * @param zoneId
+   * @return true if this is a zone is added.
+   */
+  private boolean addZoneIfNecessary(final Long zoneId, final String name,
+      final ReencryptionInfoProto reProto) {
+    if (!zoneStatuses.containsKey(zoneId)) {
+      LOG.debug("Adding zone {} for re-encryption status", zoneId);
+      Preconditions.checkNotNull(reProto);
+      final ZoneReencryptionStatus.Builder builder =
+          new ZoneReencryptionStatus.Builder();
+      builder.id(zoneId).zoneName(name)
+          .ezKeyVersionName(reProto.getEzKeyVersionName())
+          .submissionTime(reProto.getSubmissionTime())
+          .canceled(reProto.getCanceled())
+          .filesReencrypted(reProto.getNumReencrypted())
+          .fileReencryptionFailures(reProto.getNumFailures());
+      if (reProto.hasCompletionTime()) {
+        builder.completionTime(reProto.getCompletionTime());
+        builder.state(State.Completed);
+        zonesReencrypted++;
+      } else {
+        builder.state(State.Submitted);
+      }
+      if (reProto.hasLastFile()) {
+        builder.lastCheckpointFile(reProto.getLastFile());
+      }
+      return zoneStatuses.put(zoneId, builder.build()) == null;
+    }
+    return false;
+  }
+
+  public void updateZoneStatus(final Long zoneId, final String zonePath,
+      final ReencryptionInfoProto reProto) {
+    Preconditions.checkArgument(zoneId != null, "zoneId can't be null");
+    if (addZoneIfNecessary(zoneId, zonePath, reProto)) {
+      return;
+    }
+    final ZoneReencryptionStatus zs = getZoneStatus(zoneId);
+    assert zs != null;
+    if (reProto.hasCompletionTime()) {
+      zs.markZoneCompleted(reProto);
+    } else if (!reProto.hasLastFile() && !reProto.hasCompletionTime()) {
+      zs.markZoneSubmitted(reProto);
+    } else {
+      zs.updateZoneProcess(reProto);
+    }
+  }
+
+  public boolean removeZone(final Long zoneId) {
+    LOG.debug("Removing re-encryption status of zone {} ", zoneId);
+    return zoneStatuses.remove(zoneId) != null;
+  }
+
+  @VisibleForTesting
+  public int zonesQueued() {
+    int ret = 0;
+    for (Map.Entry<Long, ZoneReencryptionStatus> entry : zoneStatuses
+        .entrySet()) {
+      if (entry.getValue().getState() == State.Submitted) {
+        ret++;
+      }
+    }
+    return ret;
+  }
+
+  @VisibleForTesting
+  public int zonesTotal() {
+    return zoneStatuses.size();
+  }
+
+  @VisibleForTesting
+  public long getNumZonesReencrypted() {
+    return zonesReencrypted;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    for (Map.Entry<Long, ZoneReencryptionStatus> entry : zoneStatuses
+        .entrySet()) {
+      sb.append("[zone:" + entry.getKey());
+      sb.append(" state:" + entry.getValue().getState());
+      sb.append(" lastProcessed:" + entry.getValue().getLastCheckpointFile());
+      sb.append(" filesReencrypted:" + entry.getValue().getFilesReencrypted());
+      sb.append(" fileReencryptionFailures:" + entry.getValue()
+          .getNumReencryptionFailures() + "]");
+    }
+    return sb.toString();
+  }
+
+  public NavigableMap<Long, ZoneReencryptionStatus> getZoneStatuses() {
+    return zoneStatuses;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ReencryptionStatusIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ReencryptionStatusIterator.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ReencryptionStatusIterator.java
new file mode 100644
index 0000000..c8a8857
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ReencryptionStatusIterator.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.BatchedRemoteIterator;
+import org.apache.htrace.core.TraceScope;
+import org.apache.htrace.core.Tracer;
+
+import java.io.IOException;
+
+/**
+ * ReencryptionStatusIterator is a remote iterator that iterates over the
+ * reencryption status of encryption zones.
+ * It supports retrying in case of namenode failover.
+ */
+@InterfaceAudience.Private
+public class ReencryptionStatusIterator
+    extends BatchedRemoteIterator<Long, ZoneReencryptionStatus> {
+
+  private final ClientProtocol namenode;
+  private final Tracer tracer;
+
+  public ReencryptionStatusIterator(ClientProtocol namenode, Tracer tracer) {
+    super((long) 0);
+    this.namenode = namenode;
+    this.tracer = tracer;
+  }
+
+  @Override
+  public BatchedEntries<ZoneReencryptionStatus> makeRequest(Long prevId)
+      throws IOException {
+    try (TraceScope ignored = tracer.newScope("listReencryptionStatus")) {
+      return namenode.listReencryptionStatus(prevId);
+    }
+  }
+
+  @Override
+  public Long elementToPrevKey(ZoneReencryptionStatus entry) {
+    return entry.getId();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ZoneReencryptionStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ZoneReencryptionStatus.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ZoneReencryptionStatus.java
new file mode 100644
index 0000000..9022b9f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ZoneReencryptionStatus.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReencryptionInfoProto;
+
+/**
+ * A class representing information about re-encryption of an encryption zone.
+ * <p>
+ * FSDirectory lock is used for synchronization (except test-only methods, which
+ * are not protected).
+ */
+public class ZoneReencryptionStatus {
+  /**
+   * State of re-encryption.
+   */
+  public enum State {
+    /**
+     * Submitted for re-encryption but hasn't been picked up.
+     * This is the initial state.
+     */
+    Submitted,
+    /**
+     * Currently re-encrypting.
+     */
+    Processing,
+    /**
+     * Re-encryption completed.
+     */
+    Completed
+  }
+
+  private long id;
+  private String zoneName;
+  /**
+   * The re-encryption status of the zone. Note this is a in-memory only
+   * variable. On failover it will always be submitted, or completed if
+   * completionTime != 0;
+   */
+  private State state;
+  private String ezKeyVersionName;
+  private long submissionTime;
+  private long completionTime;
+  private boolean canceled;
+  /**
+   * Name of last file processed. It's important to record name (not inode)
+   * because we want to restore to the position even if the inode is removed.
+   */
+  private String lastCheckpointFile;
+  private long filesReencrypted;
+  private long numReencryptionFailures;
+
+  /**
+   * Builder of {@link ZoneReencryptionStatus}.
+   */
+  public static final class Builder {
+    private long id;
+    private String zoneName;
+    private State state;
+    private String ezKeyVersionName;
+    private long submissionTime;
+    private long completionTime;
+    private boolean canceled;
+    private String lastCheckpointFile;
+    private long filesReencrypted;
+    private long fileReencryptionFailures;
+
+    public Builder() {
+    }
+
+    public Builder id(final long inodeid) {
+      id = inodeid;
+      return this;
+    }
+
+    public Builder zoneName(final String ezName) {
+      zoneName = ezName;
+      return this;
+    }
+
+    public Builder state(final State st) {
+      state = st;
+      return this;
+    }
+
+    public Builder ezKeyVersionName(final String ezkvn) {
+      ezKeyVersionName = ezkvn;
+      return this;
+    }
+
+    public Builder submissionTime(final long submission) {
+      submissionTime = submission;
+      return this;
+    }
+
+    public Builder completionTime(final long completion) {
+      completionTime = completion;
+      return this;
+    }
+
+    public Builder canceled(final boolean isCanceled) {
+      canceled = isCanceled;
+      return this;
+    }
+
+    public Builder lastCheckpointFile(final String lastFile) {
+      lastCheckpointFile = lastFile;
+      return this;
+    }
+
+    public Builder filesReencrypted(final long numReencrypted) {
+      filesReencrypted = numReencrypted;
+      return this;
+    }
+
+    public Builder fileReencryptionFailures(final long numFailures) {
+      fileReencryptionFailures = numFailures;
+      return this;
+    }
+
+    public ZoneReencryptionStatus build() {
+      ZoneReencryptionStatus ret = new ZoneReencryptionStatus();
+      Preconditions.checkArgument(id != 0, "no inode id set.");
+      Preconditions.checkNotNull(state, "no state id set.");
+      Preconditions.checkNotNull(ezKeyVersionName, "no keyVersionName set.");
+      Preconditions
+          .checkArgument(submissionTime != 0, "no submission time set.");
+      ret.id = this.id;
+      ret.zoneName = this.zoneName;
+      ret.state = this.state;
+      ret.ezKeyVersionName = this.ezKeyVersionName;
+      ret.submissionTime = this.submissionTime;
+      ret.completionTime = this.completionTime;
+      ret.canceled = this.canceled;
+      ret.lastCheckpointFile = this.lastCheckpointFile;
+      ret.filesReencrypted = this.filesReencrypted;
+      ret.numReencryptionFailures = this.fileReencryptionFailures;
+      return ret;
+    }
+  }
+
+  public ZoneReencryptionStatus() {
+    reset();
+  }
+
+  void resetMetrics() {
+    filesReencrypted = 0;
+    numReencryptionFailures = 0;
+  }
+
+  public long getId() {
+    return id;
+  }
+
+  public String getZoneName() {
+    return zoneName;
+  }
+
+  void setState(final State s) {
+    state = s;
+  }
+
+  public State getState() {
+    return state;
+  }
+
+  public String getEzKeyVersionName() {
+    return ezKeyVersionName;
+  }
+
+  public long getSubmissionTime() {
+    return submissionTime;
+  }
+
+  public long getCompletionTime() {
+    return completionTime;
+  }
+
+  public boolean isCanceled() {
+    return canceled;
+  }
+
+  public String getLastCheckpointFile() {
+    return lastCheckpointFile;
+  }
+
+  public long getFilesReencrypted() {
+    return filesReencrypted;
+  }
+
+  public long getNumReencryptionFailures() {
+    return numReencryptionFailures;
+  }
+
+  public void reset() {
+    state = State.Submitted;
+    ezKeyVersionName = null;
+    submissionTime = 0;
+    completionTime = 0;
+    canceled = false;
+    lastCheckpointFile = null;
+    resetMetrics();
+  }
+
+  /**
+   * Set the zone name. The zone name is resolved from inode id and set during
+   * a listReencryptionStatus call, for the crypto admin to consume.
+   */
+  public void setZoneName(final String name) {
+    Preconditions.checkNotNull(name == null);
+    zoneName = name;
+  }
+
+  public void cancel() {
+    canceled = true;
+  }
+
+  void markZoneCompleted(final ReencryptionInfoProto proto) {
+    state = ZoneReencryptionStatus.State.Completed;
+    completionTime = proto.getCompletionTime();
+    lastCheckpointFile = null;
+    canceled = proto.getCanceled();
+    filesReencrypted = proto.getNumReencrypted();
+    numReencryptionFailures = proto.getNumFailures();
+  }
+
+  void markZoneSubmitted(final ReencryptionInfoProto proto) {
+    reset();
+    state = ZoneReencryptionStatus.State.Submitted;
+    ezKeyVersionName = proto.getEzKeyVersionName();
+    submissionTime = proto.getSubmissionTime();
+    filesReencrypted = proto.getNumReencrypted();
+    numReencryptionFailures = proto.getNumFailures();
+  }
+
+  void updateZoneProcess(final ReencryptionInfoProto proto) {
+    lastCheckpointFile = proto.getLastFile();
+    filesReencrypted = proto.getNumReencrypted();
+    numReencryptionFailures = proto.getNumFailures();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index ac06c1a..ec7d93f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -74,6 +75,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.BlocksStats;
 import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
+import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
@@ -180,6 +182,10 @@ import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncrypt
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.EncryptionZoneProto;
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListReencryptionStatusRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListReencryptionStatusResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ZoneReencryptionStatusProto;
+import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ReencryptEncryptionZoneRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.AddErasureCodingPoliciesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.AddErasureCodingPoliciesResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingPoliciesRequestProto;
@@ -1545,6 +1551,39 @@ public class ClientNamenodeProtocolTranslatorPB implements
   }
 
   @Override
+  public void reencryptEncryptionZone(String zone, ReencryptAction action)
+      throws IOException {
+    final ReencryptEncryptionZoneRequestProto.Builder builder =
+        ReencryptEncryptionZoneRequestProto.newBuilder();
+    builder.setZone(zone).setAction(PBHelperClient.convert(action));
+    ReencryptEncryptionZoneRequestProto req = builder.build();
+    try {
+      rpcProxy.reencryptEncryptionZone(null, req);
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public BatchedEntries<ZoneReencryptionStatus> listReencryptionStatus(long id)
+      throws IOException {
+    final ListReencryptionStatusRequestProto req =
+        ListReencryptionStatusRequestProto.newBuilder().setId(id).build();
+    try {
+      ListReencryptionStatusResponseProto response =
+          rpcProxy.listReencryptionStatus(null, req);
+      List<ZoneReencryptionStatus> elements =
+          Lists.newArrayListWithCapacity(response.getStatusesCount());
+      for (ZoneReencryptionStatusProto p : response.getStatusesList()) {
+        elements.add(PBHelperClient.convert(p));
+      }
+      return new BatchedListEntries<>(elements, response.getHasMore());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
   public void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag)
       throws IOException {
     SetXAttrRequestProto req = SetXAttrRequestProto.newBuilder()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
index 5b1a687..30a3108 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
@@ -83,6 +83,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -99,6 +100,7 @@ import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
+import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
 import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclEntryProto;
 import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclEntryProto.AclEntryScopeProto;
 import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclEntryProto.AclEntryTypeProto;
@@ -129,6 +131,9 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeMo
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmIdProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmSlotProto;
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.EncryptionZoneProto;
+import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ReencryptActionProto;
+import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ReencryptionStateProto;
+import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ZoneReencryptionStatusProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.AccessModeProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.AddECPolicyResponseProto;
@@ -157,6 +162,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto.Builder;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlocksProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.QuotaUsageProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReencryptionInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RollingUpgradeStatusProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.SnapshotDiffReportEntryProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.SnapshotDiffReportProto;
@@ -165,6 +171,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.SnapshottableDirectorySt
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageReportProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypesProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ZoneEncryptionInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.InotifyProtos;
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsResponseProto;
@@ -1678,6 +1685,17 @@ public class PBHelperClient {
     return builder.build();
   }
 
+  public static ReencryptActionProto convert(ReencryptAction a) {
+    switch (a) {
+    case CANCEL:
+      return ReencryptActionProto.CANCEL_REENCRYPT;
+    case START:
+      return ReencryptActionProto.START_REENCRYPT;
+    default:
+      throw new IllegalArgumentException("Unexpected value: " + a);
+    }
+  }
+
   public static RollingUpgradeActionProto convert(RollingUpgradeAction a) {
     switch (a) {
     case QUERY:
@@ -2282,6 +2300,17 @@ public class PBHelperClient {
     }
   }
 
+  public static ReencryptAction convert(ReencryptActionProto a) {
+    switch (a) {
+    case CANCEL_REENCRYPT:
+      return ReencryptAction.CANCEL;
+    case START_REENCRYPT:
+      return ReencryptAction.START;
+    default:
+      throw new IllegalArgumentException("Unexpected value: " + a);
+    }
+  }
+
   public static RollingUpgradeAction convert(RollingUpgradeActionProto a) {
     switch (a) {
     case QUERY:
@@ -2733,16 +2762,24 @@ public class PBHelperClient {
         .build();
   }
 
-  public static HdfsProtos.ZoneEncryptionInfoProto convert(
-      CipherSuite suite, CryptoProtocolVersion version, String keyName) {
+  public static ZoneEncryptionInfoProto convert(CipherSuite suite,
+      CryptoProtocolVersion version, String keyName) {
+    return convert(suite, version, keyName, null);
+  }
+
+  public static ZoneEncryptionInfoProto convert(CipherSuite suite,
+      CryptoProtocolVersion version, String keyName,
+      ReencryptionInfoProto proto) {
     if (suite == null || version == null || keyName == null) {
       return null;
     }
-    return HdfsProtos.ZoneEncryptionInfoProto.newBuilder()
-        .setSuite(convert(suite))
-        .setCryptoProtocolVersion(convert(version))
-        .setKeyName(keyName)
-        .build();
+    ZoneEncryptionInfoProto.Builder builder =
+        ZoneEncryptionInfoProto.newBuilder().setSuite(convert(suite))
+            .setCryptoProtocolVersion(convert(version)).setKeyName(keyName);
+    if (proto != null) {
+      builder.setReencryptionProto(proto);
+    }
+    return builder.build();
   }
 
   public static FileEncryptionInfo convert(
@@ -2759,6 +2796,91 @@ public class PBHelperClient {
         ezKeyVersionName);
   }
 
+  public static ReencryptionInfoProto convert(String ezkvn, Long submissionTime,
+      boolean isCanceled, long numReencrypted, long numFailures,
+      Long completionTime, String lastFile) {
+    if (ezkvn == null || submissionTime == null) {
+      return null;
+    }
+    ReencryptionInfoProto.Builder builder =
+        ReencryptionInfoProto.newBuilder().setEzKeyVersionName(ezkvn)
+            .setSubmissionTime(submissionTime).setCanceled(isCanceled)
+            .setNumReencrypted(numReencrypted).setNumFailures(numFailures);
+    if (completionTime != null) {
+      builder.setCompletionTime(completionTime);
+    }
+    if (lastFile != null) {
+      builder.setLastFile(lastFile);
+    }
+    return builder.build();
+  }
+
+  public static ZoneReencryptionStatusProto convert(ZoneReencryptionStatus zs) {
+    ZoneReencryptionStatusProto.Builder builder =
+        ZoneReencryptionStatusProto.newBuilder()
+        .setId(zs.getId())
+        .setPath(zs.getZoneName())
+        .setEzKeyVersionName(zs.getEzKeyVersionName())
+        .setSubmissionTime(zs.getSubmissionTime())
+        .setCanceled(zs.isCanceled())
+        .setNumReencrypted(zs.getFilesReencrypted())
+        .setNumFailures(zs.getNumReencryptionFailures());
+    switch (zs.getState()) {
+    case Submitted:
+      builder.setState(ReencryptionStateProto.SUBMITTED);
+      break;
+    case Processing:
+      builder.setState(ReencryptionStateProto.PROCESSING);
+      break;
+    case Completed:
+      builder.setState(ReencryptionStateProto.COMPLETED);
+      break;
+    default:
+      throw new IllegalArgumentException("Unknown state " + zs.getState());
+    }
+    final long completion = zs.getCompletionTime();
+    if (completion != 0) {
+      builder.setCompletionTime(completion);
+    }
+    final String file = zs.getLastCheckpointFile();
+    if (file != null) {
+      builder.setLastFile(file);
+    }
+    return builder.build();
+  }
+
+  public static ZoneReencryptionStatus convert(
+      ZoneReencryptionStatusProto proto) {
+    ZoneReencryptionStatus.State state;
+    switch (proto.getState()) {
+    case SUBMITTED:
+      state = ZoneReencryptionStatus.State.Submitted;
+      break;
+    case PROCESSING:
+      state = ZoneReencryptionStatus.State.Processing;
+      break;
+    case COMPLETED:
+      state = ZoneReencryptionStatus.State.Completed;
+      break;
+    default:
+      throw new IllegalArgumentException("Unknown state " + proto.getState());
+    }
+    ZoneReencryptionStatus.Builder builder = new ZoneReencryptionStatus.
+        Builder().
+        id(proto.getId()).zoneName(proto.getPath()).state(state)
+        .ezKeyVersionName(proto.getEzKeyVersionName())
+        .submissionTime(proto.getSubmissionTime()).canceled(proto.getCanceled())
+        .filesReencrypted(proto.getNumReencrypted())
+        .fileReencryptionFailures(proto.getNumFailures());
+    if (proto.hasCompletionTime()) {
+      builder.completionTime(proto.getCompletionTime());
+    }
+    if (proto.hasLastFile()) {
+      builder.lastCheckpointFile(proto.getLastFile());
+    }
+    return builder.build();
+  }
+
   public static DatanodeInfo[] convert(DatanodeInfosProto datanodeInfosProto) {
     List<DatanodeInfoProto> proto = datanodeInfosProto.getDatanodesList();
     DatanodeInfo[] infos = new DatanodeInfo[proto.size()];

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
index 4f44c5e..3f108fa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
@@ -941,6 +941,10 @@ service ClientNamenodeProtocol {
       returns(CreateEncryptionZoneResponseProto);
   rpc listEncryptionZones(ListEncryptionZonesRequestProto)
       returns(ListEncryptionZonesResponseProto);
+  rpc reencryptEncryptionZone(ReencryptEncryptionZoneRequestProto)
+      returns(ReencryptEncryptionZoneResponseProto);
+  rpc listReencryptionStatus(ListReencryptionStatusRequestProto)
+      returns(ListReencryptionStatusResponseProto);
   rpc getEZForPath(GetEZForPathRequestProto)
       returns(GetEZForPathResponseProto);
   rpc setErasureCodingPolicy(SetErasureCodingPolicyRequestProto)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/encryption.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/encryption.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/encryption.proto
index 68b2f3a..75d3a0e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/encryption.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/encryption.proto
@@ -58,6 +58,47 @@ message ListEncryptionZonesResponseProto {
   required bool hasMore = 2;
 }
 
+enum ReencryptActionProto {
+  CANCEL_REENCRYPT = 1;
+  START_REENCRYPT = 2;
+}
+
+message ReencryptEncryptionZoneRequestProto {
+  required ReencryptActionProto action = 1;
+  required string zone = 2;
+}
+
+message ReencryptEncryptionZoneResponseProto {
+}
+
+message ListReencryptionStatusRequestProto {
+  required int64 id = 1;
+}
+
+enum ReencryptionStateProto {
+  SUBMITTED = 1;
+  PROCESSING = 2;
+  COMPLETED = 3;
+}
+
+message ZoneReencryptionStatusProto {
+  required int64 id = 1;
+  required string path = 2;
+  required ReencryptionStateProto state = 3;
+  required string ezKeyVersionName = 4;
+  required int64 submissionTime = 5;
+  required bool canceled = 6;
+  required int64 numReencrypted = 7;
+  required int64 numFailures = 8;
+  optional int64 completionTime = 9;
+  optional string lastFile = 10;
+}
+
+message ListReencryptionStatusResponseProto {
+  repeated ZoneReencryptionStatusProto statuses = 1;
+  required bool hasMore = 2;
+}
+
 message GetEZForPathRequestProto {
     required string src = 1;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
index 7109980..59381bc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
@@ -313,6 +313,20 @@ message ZoneEncryptionInfoProto {
   required CipherSuiteProto suite = 1;
   required CryptoProtocolVersionProto cryptoProtocolVersion = 2;
   required string keyName = 3;
+  optional ReencryptionInfoProto reencryptionProto = 4;
+}
+
+/**
+ * Re-encryption information for an encryption zone
+ */
+message ReencryptionInfoProto {
+  required string ezKeyVersionName = 1;
+  required uint64 submissionTime = 2;
+  required bool canceled = 3;
+  required int64 numReencrypted = 4;
+  required int64 numFailures = 5;
+  optional uint64 completionTime = 6;
+  optional string lastFile = 7;
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index f4c383e..7f60000 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -881,6 +881,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       HdfsClientConfigKeys.DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY;
   public static final int    DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES_DEFAULT = 100;
   public static final String DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES = "dfs.namenode.list.encryption.zones.num.responses";
+  public static final int    DFS_NAMENODE_LIST_REENCRYPTION_STATUS_NUM_RESPONSES_DEFAULT = 100;
+  public static final String DFS_NAMENODE_LIST_REENCRYPTION_STATUS_NUM_RESPONSES_KEY = "dfs.namenode.list.reencryption.status.num.responses";
   public static final String DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES =
       "dfs.namenode.list.openfiles.num.responses";
   public static final int    DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES_DEFAULT =
@@ -889,6 +891,16 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int DFS_NAMENODE_EDEKCACHELOADER_INTERVAL_MS_DEFAULT = 1000;
   public static final String DFS_NAMENODE_EDEKCACHELOADER_INITIAL_DELAY_MS_KEY = "dfs.namenode.edekcacheloader.initial.delay.ms";
   public static final int DFS_NAMENODE_EDEKCACHELOADER_INITIAL_DELAY_MS_DEFAULT = 3000;
+  public static final String DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_KEY = "dfs.namenode.reencrypt.sleep.interval";
+  public static final String DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_DEFAULT = "1m";
+  public static final String DFS_NAMENODE_REENCRYPT_BATCH_SIZE_KEY = "dfs.namenode.reencrypt.batch.size";
+  public static final int DFS_NAMENODE_REENCRYPT_BATCH_SIZE_DEFAULT = 1000;
+  public static final String DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_KEY = "dfs.namenode.reencrypt.throttle.limit.handler.ratio";
+  public static final double DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_DEFAULT = 1.0;
+  public static final String DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_UPDATER_RATIO_KEY = "dfs.namenode.reencrypt.throttle.limit.updater.ratio";
+  public static final double DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_UPDATER_RATIO_DEFAULT = 1.0;
+  public static final String DFS_NAMENODE_REENCRYPT_EDEK_THREADS_KEY = "dfs.namenode.reencrypt.edek.threads";
+  public static final int DFS_NAMENODE_REENCRYPT_EDEK_THREADS_DEFAULT = 10;
 
   // Journal-node related configs. These are read on the JN side.
   public static final String  DFS_JOURNALNODE_EDITS_DIR_KEY = "dfs.journalnode.edits.dir";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
index a446276..44d5216 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
+import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
 import org.apache.hadoop.hdfs.protocol.proto.AclProtos.GetAclStatusRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.AclProtos.GetAclStatusResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.AclProtos.ModifyAclEntriesRequestProto;
@@ -221,8 +222,12 @@ import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathR
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListReencryptionStatusRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListReencryptionStatusResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.AddErasureCodingPoliciesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.AddErasureCodingPoliciesResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ReencryptEncryptionZoneRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ReencryptEncryptionZoneResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingPoliciesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingPoliciesResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingPolicyRequestProto;
@@ -1483,6 +1488,37 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
   }
 
   @Override
+  public ReencryptEncryptionZoneResponseProto reencryptEncryptionZone(
+      RpcController controller, ReencryptEncryptionZoneRequestProto req)
+      throws ServiceException {
+    try {
+      server.reencryptEncryptionZone(req.getZone(),
+          PBHelperClient.convert(req.getAction()));
+      return ReencryptEncryptionZoneResponseProto.newBuilder().build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  public ListReencryptionStatusResponseProto listReencryptionStatus(
+      RpcController controller, ListReencryptionStatusRequestProto req)
+      throws ServiceException {
+    try {
+      BatchedEntries<ZoneReencryptionStatus> entries = server
+          .listReencryptionStatus(req.getId());
+      ListReencryptionStatusResponseProto.Builder builder =
+          ListReencryptionStatusResponseProto.newBuilder();
+      builder.setHasMore(entries.hasMore());
+      for (int i=0; i<entries.size(); i++) {
+        builder.addStatuses(PBHelperClient.convert(entries.get(i)));
+      }
+      return builder.build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
   public SetErasureCodingPolicyResponseProto setErasureCodingPolicy(
       RpcController controller, SetErasureCodingPolicyRequestProto req)
       throws ServiceException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionFaultInjector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionFaultInjector.java
index 104d8c3..e4a035e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionFaultInjector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionFaultInjector.java
@@ -42,4 +42,13 @@ public class EncryptionFaultInjector {
 
   @VisibleForTesting
   public void startFileAfterGenerateKey() throws IOException {}
+
+  @VisibleForTesting
+  public void reencryptEncryptedKeys() throws IOException {}
+
+  @VisibleForTesting
+  public void reencryptUpdaterProcessOneTask() throws IOException {}
+
+  @VisibleForTesting
+  public void reencryptUpdaterProcessCheckpoint() throws IOException {}
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java
index 96e189b..d6302ba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java
@@ -19,32 +19,45 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.TreeMap;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.CipherSuite;
 import org.apache.hadoop.crypto.CryptoProtocolVersion;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.XAttr;
 import org.apache.hadoop.fs.XAttrSetFlag;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.XAttrHelper;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
+import org.apache.hadoop.hdfs.protocol.ReencryptionStatus;
 import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
+import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
 import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
+import org.apache.hadoop.security.AccessControlException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 import static org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_REENCRYPTION_STATUS_NUM_RESPONSES_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_REENCRYPTION_STATUS_NUM_RESPONSES_KEY;
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants
     .CRYPTO_XATTR_ENCRYPTION_ZONE;
 
@@ -57,7 +70,7 @@ import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants
  */
 public class EncryptionZoneManager {
 
-  public static Logger LOG = LoggerFactory.getLogger(EncryptionZoneManager
+  public static final Logger LOG = LoggerFactory.getLogger(EncryptionZoneManager
       .class);
 
   /**
@@ -99,6 +112,91 @@ public class EncryptionZoneManager {
   private TreeMap<Long, EncryptionZoneInt> encryptionZones = null;
   private final FSDirectory dir;
   private final int maxListEncryptionZonesResponses;
+  private final int maxListRecncryptionStatusResponses;
+
+  private ThreadFactory reencryptionThreadFactory;
+  private ExecutorService reencryptHandlerExecutor;
+  private ReencryptionHandler reencryptionHandler;
+  // Reencryption status is kept here to decouple status listing (which should
+  // work as long as NN is up), with the actual handler (which only exists if
+  // keyprovider exists)
+  private final ReencryptionStatus reencryptionStatus;
+
+  public static final BatchedListEntries<ZoneReencryptionStatus> EMPTY_LIST =
+      new BatchedListEntries<>(new ArrayList<ZoneReencryptionStatus>(), false);
+
+  @VisibleForTesting
+  public void pauseReencryptForTesting() {
+    reencryptionHandler.pauseForTesting();
+  }
+
+  @VisibleForTesting
+  public void resumeReencryptForTesting() {
+    reencryptionHandler.resumeForTesting();
+  }
+
+  @VisibleForTesting
+  public void pauseForTestingAfterNthSubmission(final int count) {
+    reencryptionHandler.pauseForTestingAfterNthSubmission(count);
+  }
+
+  @VisibleForTesting
+  public void pauseReencryptUpdaterForTesting() {
+    reencryptionHandler.pauseUpdaterForTesting();
+  }
+
+  @VisibleForTesting
+  public void resumeReencryptUpdaterForTesting() {
+    reencryptionHandler.resumeUpdaterForTesting();
+  }
+
+  @VisibleForTesting
+  public void pauseForTestingAfterNthCheckpoint(final String zone,
+      final int count) throws IOException {
+    INodesInPath iip;
+    dir.readLock();
+    try {
+      iip = dir.resolvePath(dir.getPermissionChecker(), zone, DirOp.READ);
+    } finally {
+      dir.readUnlock();
+    }
+    reencryptionHandler
+        .pauseForTestingAfterNthCheckpoint(iip.getLastINode().getId(), count);
+  }
+
+  @VisibleForTesting
+  public void resetMetricsForTesting() {
+    reencryptionStatus.resetMetrics();
+  }
+
+  @VisibleForTesting
+  public ReencryptionStatus getReencryptionStatus() {
+    return reencryptionStatus;
+  }
+
+  @VisibleForTesting
+  public ZoneReencryptionStatus getZoneStatus(final String zone)
+      throws IOException {
+    final FSPermissionChecker pc = dir.getPermissionChecker();
+    final INode inode;
+    dir.getFSNamesystem().readLock();
+    dir.readLock();
+    try {
+      final INodesInPath iip = dir.resolvePath(pc, zone, DirOp.READ);
+      inode = iip.getLastINode();
+      if (inode == null) {
+        return null;
+      }
+      return getReencryptionStatus().getZoneStatus(inode.getId());
+    } finally {
+      dir.readUnlock();
+      dir.getFSNamesystem().readUnlock();
+    }
+  }
+
+  FSDirectory getFSDirectory() {
+    return dir;
+  }
 
   /**
    * Construct a new EncryptionZoneManager.
@@ -115,6 +213,50 @@ public class EncryptionZoneManager {
         DFSConfigKeys.DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES + " " +
             "must be a positive integer."
     );
+    if (getProvider() != null) {
+      reencryptionHandler = new ReencryptionHandler(this, conf);
+      reencryptionThreadFactory = new ThreadFactoryBuilder().setDaemon(true)
+          .setNameFormat("reencryptionHandlerThread #%d").build();
+    }
+    maxListRecncryptionStatusResponses =
+        conf.getInt(DFS_NAMENODE_LIST_REENCRYPTION_STATUS_NUM_RESPONSES_KEY,
+            DFS_NAMENODE_LIST_REENCRYPTION_STATUS_NUM_RESPONSES_DEFAULT);
+    Preconditions.checkArgument(maxListRecncryptionStatusResponses >= 0,
+        DFS_NAMENODE_LIST_REENCRYPTION_STATUS_NUM_RESPONSES_KEY +
+            " must be a positive integer."
+    );
+    reencryptionStatus = new ReencryptionStatus();
+  }
+
+  KeyProviderCryptoExtension getProvider() {
+    return dir.getProvider();
+  }
+
+  void startReencryptThreads() {
+    if (getProvider() == null) {
+      return;
+    }
+    Preconditions.checkNotNull(reencryptionHandler);
+    reencryptHandlerExecutor =
+        Executors.newSingleThreadExecutor(reencryptionThreadFactory);
+    reencryptHandlerExecutor.execute(reencryptionHandler);
+    reencryptionHandler.startUpdaterThread();
+  }
+
+  void stopReencryptThread() {
+    if (getProvider() == null || reencryptionHandler == null) {
+      return;
+    }
+    dir.writeLock();
+    try {
+      reencryptionHandler.stopThreads();
+    } finally {
+      dir.writeUnlock();
+    }
+    if (reencryptHandlerExecutor != null) {
+      reencryptHandlerExecutor.shutdownNow();
+      reencryptHandlerExecutor = null;
+    }
   }
 
   /**
@@ -157,7 +299,13 @@ public class EncryptionZoneManager {
   void removeEncryptionZone(Long inodeId) {
     assert dir.hasWriteLock();
     if (hasCreatedEncryptionZone()) {
-      encryptionZones.remove(inodeId);
+      if (encryptionZones.remove(inodeId) == null
+          || !getReencryptionStatus().hasRunningZone(inodeId)) {
+        return;
+      }
+      if (reencryptionHandler != null) {
+        reencryptionHandler.removeZone(inodeId);
+      }
     }
   }
 
@@ -173,13 +321,17 @@ public class EncryptionZoneManager {
   }
 
   /**
-   * Returns the path of the EncryptionZoneInt.
+   * Returns the full path from an INode id.
    * <p/>
    * Called while holding the FSDirectory lock.
    */
-  private String getFullPathName(EncryptionZoneInt ezi) {
+  String getFullPathName(Long nodeId) {
     assert dir.hasReadLock();
-    return dir.getInode(ezi.getINodeId()).getFullPathName();
+    INode inode = dir.getInode(nodeId);
+    if (inode == null) {
+      return null;
+    }
+    return inode.getFullPathName();
   }
 
   /**
@@ -247,7 +399,8 @@ public class EncryptionZoneManager {
     if (ezi == null) {
       return null;
     } else {
-      return new EncryptionZone(ezi.getINodeId(), getFullPathName(ezi),
+      return new EncryptionZone(ezi.getINodeId(),
+          getFullPathName(ezi.getINodeId()),
           ezi.getSuite(), ezi.getVersion(), ezi.getKeyName());
     }
   }
@@ -284,8 +437,8 @@ public class EncryptionZoneManager {
 
     if (srcInEZ) {
       if (srcParentEZI != dstParentEZI) {
-        final String srcEZPath = getFullPathName(srcParentEZI);
-        final String dstEZPath = getFullPathName(dstParentEZI);
+        final String srcEZPath = getFullPathName(srcParentEZI.getINodeId());
+        final String dstEZPath = getFullPathName(dstParentEZI.getINodeId());
         final StringBuilder sb = new StringBuilder(srcIIP.getPath());
         sb.append(" can't be moved from encryption zone ");
         sb.append(srcEZPath);
@@ -294,6 +447,24 @@ public class EncryptionZoneManager {
         sb.append(".");
         throw new IOException(sb.toString());
       }
+      checkMoveValidityForReencryption(srcIIP.getPath(),
+          srcParentEZI.getINodeId());
+    } else if (dstInEZ) {
+      checkMoveValidityForReencryption(dstIIP.getPath(),
+          dstParentEZI.getINodeId());
+    }
+  }
+
+  private void checkMoveValidityForReencryption(final String pathName,
+      final long zoneId) throws IOException {
+    assert dir.hasReadLock();
+    final ZoneReencryptionStatus zs = reencryptionStatus.getZoneStatus(zoneId);
+    if (zs != null && zs.getState() != ZoneReencryptionStatus.State.Completed) {
+      final StringBuilder sb = new StringBuilder(pathName);
+      sb.append(" can't be moved because encryption zone ");
+      sb.append(getFullPathName(zoneId));
+      sb.append(" is currently under re-encryption");
+      throw new IOException(sb.toString());
     }
   }
 
@@ -364,19 +535,13 @@ public class EncryptionZoneManager {
       /*
        Skip EZs that are only present in snapshots. Re-resolve the path to 
        see if the path's current inode ID matches EZ map's INode ID.
-       
+
        INode#getFullPathName simply calls getParent recursively, so will return
-       the INode's parents at the time it was snapshotted. It will not 
+       the INode's parents at the time it was snapshotted. It will not
        contain a reference INode.
       */
-      final String pathName = getFullPathName(ezi);
-      INode inode = dir.getInode(ezi.getINodeId());
-      INode lastINode = null;
-      if (inode.getParent() != null || inode.isRoot()) {
-        INodesInPath iip = dir.getINodesInPath(pathName, DirOp.READ_LINK);
-        lastINode = iip.getLastINode();
-      }
-      if (lastINode == null || lastINode.getId() != ezi.getINodeId()) {
+      final String pathName = getFullPathName(ezi.getINodeId());
+      if (!pathResolvesToId(ezi.getINodeId(), pathName)) {
         continue;
       }
       // Add the EZ to the result list
@@ -392,6 +557,156 @@ public class EncryptionZoneManager {
   }
 
   /**
+   * Resolves the path to inode id, then check if it's the same as the inode id
+   * passed in. This is necessary to filter out zones in snapshots.
+   * @param zoneId
+   * @param zonePath
+   * @return true if path resolve to the id, false if not.
+   * @throws UnresolvedLinkException
+   */
+  private boolean pathResolvesToId(final long zoneId, final String zonePath)
+      throws UnresolvedLinkException, AccessControlException,
+      ParentNotDirectoryException {
+    assert dir.hasReadLock();
+    INode inode = dir.getInode(zoneId);
+    if (inode == null) {
+      return false;
+    }
+    INode lastINode = null;
+    if (inode.getParent() != null || inode.isRoot()) {
+      INodesInPath iip = dir.getINodesInPath(zonePath, DirOp.READ_LINK);
+      lastINode = iip.getLastINode();
+    }
+    if (lastINode == null || lastINode.getId() != zoneId) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Re-encrypts the given encryption zone path. If the given path is not the
+   * root of an encryption zone, an exception is thrown.
+   */
+  XAttr reencryptEncryptionZone(final INodesInPath zoneIIP,
+      final String keyVersionName) throws IOException {
+    assert dir.hasWriteLock();
+    if (reencryptionHandler == null) {
+      throw new IOException("No key provider configured, re-encryption "
+          + "operation is rejected");
+    }
+    final INode inode = zoneIIP.getLastINode();
+    final String zoneName = zoneIIP.getPath();
+    checkEncryptionZoneRoot(inode, zoneName);
+    if (getReencryptionStatus().hasRunningZone(inode.getId())) {
+      throw new IOException("Zone " + zoneName
+          + " is already submitted for re-encryption.");
+    }
+    LOG.info("Zone {}({}) is submitted for re-encryption.", zoneName,
+        inode.getId());
+    XAttr ret = FSDirEncryptionZoneOp
+        .updateReencryptionSubmitted(dir, zoneIIP, keyVersionName);
+    reencryptionHandler.notifyNewSubmission();
+    return ret;
+  }
+
+  /**
+   * Cancels the currently-running re-encryption of the given encryption zone.
+   * If the given path is not the root of an encryption zone,
+   * * an exception is thrown.
+   */
+  List<XAttr> cancelReencryptEncryptionZone(final INodesInPath zoneIIP)
+      throws IOException {
+    assert dir.hasWriteLock();
+    if (reencryptionHandler == null) {
+      throw new IOException("No key provider configured, re-encryption "
+          + "operation is rejected");
+    }
+    final long zoneId = zoneIIP.getLastINode().getId();
+    final String zoneName = zoneIIP.getPath();
+    checkEncryptionZoneRoot(zoneIIP.getLastINode(), zoneName);
+    reencryptionHandler.cancelZone(zoneId, zoneName);
+    LOG.info("Cancelled zone {}({}) for re-encryption.", zoneName, zoneId);
+    return FSDirEncryptionZoneOp.updateReencryptionFinish(dir, zoneIIP,
+        reencryptionStatus.getZoneStatus(zoneId));
+  }
+
+  /**
+   * Cursor-based listing of zone re-encryption status.
+   * <p/>
+   * Called while holding the FSDirectory lock.
+   */
+  BatchedListEntries<ZoneReencryptionStatus> listReencryptionStatus(
+      final long prevId) throws IOException {
+    assert dir.hasReadLock();
+    if (!hasCreatedEncryptionZone()) {
+      return ReencryptionStatus.EMPTY_LIST;
+    }
+
+    NavigableMap<Long, ZoneReencryptionStatus> stats =
+        reencryptionStatus.getZoneStatuses();
+
+    if (stats.isEmpty()) {
+      return EMPTY_LIST;
+    }
+
+    NavigableMap<Long, ZoneReencryptionStatus> tailMap =
+        stats.tailMap(prevId, false);
+    final int numResp =
+        Math.min(maxListRecncryptionStatusResponses, tailMap.size());
+    final List<ZoneReencryptionStatus> ret =
+        Lists.newArrayListWithExpectedSize(numResp);
+    int count = 0;
+    for (ZoneReencryptionStatus zs : tailMap.values()) {
+      final String name = getFullPathName(zs.getId());
+      if (name == null || !pathResolvesToId(zs.getId(), name)) {
+        continue;
+      }
+      zs.setZoneName(name);
+      ret.add(zs);
+      ++count;
+      if (count >= numResp) {
+        break;
+      }
+    }
+    final boolean hasMore = (numResp < tailMap.size());
+    return new BatchedListEntries<>(ret, hasMore);
+  }
+
+  /**
+   * Return whether an INode is an encryption zone root.
+   */
+  boolean isEncryptionZoneRoot(final INode inode, final String name)
+      throws FileNotFoundException {
+    assert dir.hasReadLock();
+    if (inode == null) {
+      throw new FileNotFoundException("INode does not exist for " + name);
+    }
+    if (!inode.isDirectory()) {
+      return false;
+    }
+    if (!hasCreatedEncryptionZone()
+        || !encryptionZones.containsKey(inode.getId())) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Return whether an INode is an encryption zone root.
+   *
+   * @param inode the zone inode
+   * @throws IOException if the inode is not a directory,
+   *                     or is a directory but not the root of an EZ.
+   */
+  void checkEncryptionZoneRoot(final INode inode, final String name)
+      throws IOException {
+    if (!isEncryptionZoneRoot(inode, name)) {
+      throw new IOException("Path " + name + " is not the root of an"
+          + " encryption zone.");
+    }
+  }
+
+  /**
    * @return number of encryption zones.
    */
   public int getNumEncryptionZones() {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[3/4] hadoop git commit: HDFS-10899. Add functionality to re-encrypt EDEKs.

Posted by xi...@apache.org.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java
index 22039d1..2552cf5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_FILE_ENCRYPTION_INFO;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.security.GeneralSecurityException;
 import java.security.PrivilegedExceptionAction;
@@ -31,6 +32,7 @@ import java.util.Map;
 import org.apache.hadoop.crypto.CipherSuite;
 import org.apache.hadoop.crypto.CryptoProtocolVersion;
 import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
 import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
 import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
 import org.apache.hadoop.fs.FileEncryptionInfo;
@@ -42,15 +44,22 @@ import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.hdfs.XAttrHelper;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
+import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
 import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReencryptionInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ZoneEncryptionInfoProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
+import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.FileEdekInfo;
 import org.apache.hadoop.security.SecurityUtil;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.util.Time;
+
+import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_ENCRYPTION_ZONE;
 import static org.apache.hadoop.util.Time.monotonicNow;
 
 /**
@@ -216,18 +225,206 @@ final class FSDirEncryptionZoneOp {
     }
   }
 
+  static void reencryptEncryptionZone(final FSDirectory fsd,
+      final String zone, final String keyVersionName,
+      final boolean logRetryCache) throws IOException {
+    final List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
+    final FSPermissionChecker pc = fsd.getPermissionChecker();
+    fsd.writeLock();
+    try {
+      final INodesInPath iip = fsd.resolvePath(pc, zone, DirOp.WRITE);
+      final XAttr xattr = fsd.ezManager
+          .reencryptEncryptionZone(iip, keyVersionName);
+      xAttrs.add(xattr);
+    } finally {
+      fsd.writeUnlock();
+    }
+    fsd.getEditLog().logSetXAttrs(zone, xAttrs, logRetryCache);
+  }
+
+  static void cancelReencryptEncryptionZone(final FSDirectory fsd,
+      final String zone, final boolean logRetryCache) throws IOException {
+    final List<XAttr> xattrs;
+    final FSPermissionChecker pc = fsd.getPermissionChecker();
+    fsd.writeLock();
+    try {
+      final INodesInPath iip = fsd.resolvePath(pc, zone, DirOp.WRITE);
+      xattrs = fsd.ezManager.cancelReencryptEncryptionZone(iip);
+    } finally {
+      fsd.writeUnlock();
+    }
+    if (xattrs != null && !xattrs.isEmpty()) {
+      fsd.getEditLog().logSetXAttrs(zone, xattrs, logRetryCache);
+    }
+  }
+
+  static BatchedListEntries<ZoneReencryptionStatus> listReencryptionStatus(
+      final FSDirectory fsd, final long prevId)
+      throws IOException {
+    fsd.readLock();
+    try {
+      return fsd.ezManager.listReencryptionStatus(prevId);
+    } finally {
+      fsd.readUnlock();
+    }
+  }
+
+  /**
+   * Update re-encryption progress (submitted). Caller should
+   * logSync after calling this, outside of the FSN lock.
+   * <p>
+   * The reencryption status is updated during SetXAttrs.
+   */
+  static XAttr updateReencryptionSubmitted(final FSDirectory fsd,
+      final INodesInPath iip, final String ezKeyVersionName)
+      throws IOException {
+    assert fsd.hasWriteLock();
+    Preconditions.checkNotNull(ezKeyVersionName, "ezKeyVersionName is null.");
+    final ZoneEncryptionInfoProto zoneProto = getZoneEncryptionInfoProto(iip);
+    Preconditions.checkNotNull(zoneProto, "ZoneEncryptionInfoProto is null.");
+
+    final ReencryptionInfoProto newProto = PBHelperClient
+        .convert(ezKeyVersionName, Time.now(), false, 0, 0, null, null);
+    final ZoneEncryptionInfoProto newZoneProto = PBHelperClient
+        .convert(PBHelperClient.convert(zoneProto.getSuite()),
+            PBHelperClient.convert(zoneProto.getCryptoProtocolVersion()),
+            zoneProto.getKeyName(), newProto);
+
+    final XAttr xattr = XAttrHelper
+        .buildXAttr(CRYPTO_XATTR_ENCRYPTION_ZONE, newZoneProto.toByteArray());
+    final List<XAttr> xattrs = Lists.newArrayListWithCapacity(1);
+    xattrs.add(xattr);
+    FSDirXAttrOp.unprotectedSetXAttrs(fsd, iip, xattrs,
+        EnumSet.of(XAttrSetFlag.REPLACE));
+    return xattr;
+  }
+
+  /**
+   * Update re-encryption progress (start, checkpoint). Caller should
+   * logSync after calling this, outside of the FSN lock.
+   * <p>
+   * The reencryption status is updated during SetXAttrs.
+   * Original reencryption status is passed in to get existing information
+   * such as ezkeyVersionName and submissionTime.
+   */
+  static XAttr updateReencryptionProgress(final FSDirectory fsd,
+      final INode zoneNode, final ZoneReencryptionStatus origStatus,
+      final String lastFile, final long numReencrypted, final long numFailures)
+      throws IOException {
+    assert fsd.hasWriteLock();
+    Preconditions.checkNotNull(zoneNode, "Zone node is null");
+    INodesInPath iip = INodesInPath.fromINode(zoneNode);
+    final ZoneEncryptionInfoProto zoneProto = getZoneEncryptionInfoProto(iip);
+    Preconditions.checkNotNull(zoneProto, "ZoneEncryptionInfoProto is null.");
+    Preconditions.checkNotNull(origStatus, "Null status for " + iip.getPath());
+
+    final ReencryptionInfoProto newProto = PBHelperClient
+        .convert(origStatus.getEzKeyVersionName(),
+            origStatus.getSubmissionTime(), false,
+            origStatus.getFilesReencrypted() + numReencrypted,
+            origStatus.getNumReencryptionFailures() + numFailures, null,
+            lastFile);
+
+    final ZoneEncryptionInfoProto newZoneProto = PBHelperClient
+        .convert(PBHelperClient.convert(zoneProto.getSuite()),
+            PBHelperClient.convert(zoneProto.getCryptoProtocolVersion()),
+            zoneProto.getKeyName(), newProto);
+
+    final XAttr xattr = XAttrHelper
+        .buildXAttr(CRYPTO_XATTR_ENCRYPTION_ZONE, newZoneProto.toByteArray());
+    final List<XAttr> xattrs = Lists.newArrayListWithCapacity(1);
+    xattrs.add(xattr);
+    FSDirXAttrOp.unprotectedSetXAttrs(fsd, iip, xattrs,
+        EnumSet.of(XAttrSetFlag.REPLACE));
+    return xattr;
+  }
+
+  /**
+   * Log re-encrypt complete (cancel, or 100% re-encrypt) to edits.
+   * Caller should logSync after calling this, outside of the FSN lock.
+   * <p>
+   * Original reencryption status is passed in to get existing information,
+   * this should include whether it is finished due to cancellation.
+   * The reencryption status is updated during SetXAttrs for completion time.
+   */
+  static List<XAttr> updateReencryptionFinish(final FSDirectory fsd,
+      final INodesInPath zoneIIP, final ZoneReencryptionStatus origStatus)
+      throws IOException {
+    assert origStatus != null;
+    assert fsd.hasWriteLock();
+    fsd.ezManager.getReencryptionStatus()
+        .markZoneCompleted(zoneIIP.getLastINode().getId());
+    final XAttr xattr =
+        generateNewXAttrForReencryptionFinish(zoneIIP, origStatus);
+    final List<XAttr> xattrs = Lists.newArrayListWithCapacity(1);
+    xattrs.add(xattr);
+    FSDirXAttrOp.unprotectedSetXAttrs(fsd, zoneIIP, xattrs,
+        EnumSet.of(XAttrSetFlag.REPLACE));
+    return xattrs;
+  }
+
+  static XAttr generateNewXAttrForReencryptionFinish(final INodesInPath iip,
+      final ZoneReencryptionStatus status) throws IOException {
+    final ZoneEncryptionInfoProto zoneProto = getZoneEncryptionInfoProto(iip);
+    final ReencryptionInfoProto newRiProto = PBHelperClient
+        .convert(status.getEzKeyVersionName(), status.getSubmissionTime(),
+            status.isCanceled(), status.getFilesReencrypted(),
+            status.getNumReencryptionFailures(), Time.now(), null);
+
+    final ZoneEncryptionInfoProto newZoneProto = PBHelperClient
+        .convert(PBHelperClient.convert(zoneProto.getSuite()),
+            PBHelperClient.convert(zoneProto.getCryptoProtocolVersion()),
+            zoneProto.getKeyName(), newRiProto);
+
+    final XAttr xattr = XAttrHelper
+        .buildXAttr(CRYPTO_XATTR_ENCRYPTION_ZONE, newZoneProto.toByteArray());
+    return xattr;
+  }
+
+  private static ZoneEncryptionInfoProto getZoneEncryptionInfoProto(
+      final INodesInPath iip) throws IOException {
+    final XAttr fileXAttr = FSDirXAttrOp
+        .unprotectedGetXAttrByPrefixedName(iip, CRYPTO_XATTR_ENCRYPTION_ZONE);
+    if (fileXAttr == null) {
+      throw new IOException(
+          "Could not find reencryption XAttr for file " + iip.getPath());
+    }
+    try {
+      return ZoneEncryptionInfoProto.parseFrom(fileXAttr.getValue());
+    } catch (InvalidProtocolBufferException e) {
+      throw new IOException(
+          "Could not parse file encryption info for " + "inode " + iip
+              .getPath(), e);
+    }
+  }
+
+  /**
+   * Save the batch's edeks to file xattrs.
+   */
+  static void saveFileXAttrsForBatch(FSDirectory fsd,
+      List<FileEdekInfo> batch) {
+    assert fsd.getFSNamesystem().hasWriteLock();
+    if (batch != null && !batch.isEmpty()) {
+      for (FileEdekInfo entry : batch) {
+        final INode inode = fsd.getInode(entry.getInodeId());
+        Preconditions.checkNotNull(inode);
+        fsd.getEditLog().logSetXAttrs(inode.getFullPathName(),
+            inode.getXAttrFeature().getXAttrs(), false);
+      }
+    }
+  }
+
   /**
    * Set the FileEncryptionInfo for an INode.
    *
    * @param fsd fsdirectory
-   * @param src the path of a directory which will be the root of the
-   *            encryption zone.
    * @param info file encryption information
+   * @param flag action when setting xattr. Either CREATE or REPLACE.
    * @throws IOException
    */
   static void setFileEncryptionInfo(final FSDirectory fsd,
-      final INodesInPath iip, final FileEncryptionInfo info)
-          throws IOException {
+      final INodesInPath iip, final FileEncryptionInfo info,
+      final XAttrSetFlag flag) throws IOException {
     // Make the PB for the xattr
     final HdfsProtos.PerFileEncryptionInfoProto proto =
         PBHelperClient.convertPerFileEncInfo(info);
@@ -238,8 +435,7 @@ final class FSDirEncryptionZoneOp {
     xAttrs.add(fileEncryptionAttr);
     fsd.writeLock();
     try {
-      FSDirXAttrOp.unprotectedSetXAttrs(fsd, iip, xAttrs,
-                                        EnumSet.of(XAttrSetFlag.CREATE));
+      FSDirXAttrOp.unprotectedSetXAttrs(fsd, iip, xAttrs, EnumSet.of(flag));
     } finally {
       fsd.writeUnlock();
     }
@@ -500,4 +696,34 @@ final class FSDirEncryptionZoneOp {
       this.edek = edek;
     }
   }
+
+  /**
+   * Get the last key version name for the given EZ. This will contact
+   * the KMS to getKeyVersions.
+   * @param zone the encryption zone
+   * @param pc the permission checker
+   * @return the last element from the list of keyVersionNames returned by KMS.
+   * @throws IOException
+   */
+  static KeyVersion getLatestKeyVersion(final FSDirectory dir,
+      final String zone, final FSPermissionChecker pc) throws IOException {
+    final EncryptionZone ez;
+    assert dir.getProvider() != null;
+    dir.readLock();
+    try {
+      final INodesInPath iip = dir.resolvePath(pc, zone, DirOp.READ);
+      if (iip.getLastINode() == null) {
+        throw new FileNotFoundException(zone + " does not exist.");
+      }
+      dir.ezManager.checkEncryptionZoneRoot(iip.getLastINode(), iip.getPath());
+      ez = FSDirEncryptionZoneOp.getEZForPath(dir, iip);
+    } finally {
+      dir.readUnlock();
+    }
+    // Contact KMS out of locks.
+    KeyVersion currKv = dir.getProvider().getCurrentKey(ez.getKeyName());
+    Preconditions.checkNotNull(currKv,
+        "No current key versions for key name " + ez.getKeyName());
+    return currKv;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
index 7ab05d7..012e916 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 import com.google.common.base.Preconditions;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.fs.XAttrSetFlag;
 import org.apache.hadoop.hdfs.AddBlockFlag;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
@@ -397,7 +398,8 @@ class FSDirWriteFileOp {
         newNode.getFileUnderConstructionFeature().getClientName(),
         newNode.getId());
     if (feInfo != null) {
-      FSDirEncryptionZoneOp.setFileEncryptionInfo(fsd, iip, feInfo);
+      FSDirEncryptionZoneOp.setFileEncryptionInfo(fsd, iip, feInfo,
+          XAttrSetFlag.CREATE);
     }
     setNewINodeStoragePolicy(fsd.getBlockManager(), iip, isLazyPersist);
     fsd.getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
index ddc088c..acdade7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.XAttrHelper;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReencryptionInfoProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.security.AccessControlException;
@@ -275,6 +276,12 @@ class FSDirXAttrOp {
             PBHelperClient.convert(ezProto.getSuite()),
             PBHelperClient.convert(ezProto.getCryptoProtocolVersion()),
             ezProto.getKeyName());
+
+        if (ezProto.hasReencryptionProto()) {
+          ReencryptionInfoProto reProto = ezProto.getReencryptionProto();
+          fsd.ezManager.getReencryptionStatus()
+              .updateZoneStatus(inode.getId(), iip.getPath(), reProto);
+        }
       }
 
       if (!isFile && SECURITY_XATTR_UNREADABLE_BY_SUPERUSER.equals(xaName)) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 87b1156..e6aa533 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReencryptionInfoProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
@@ -1358,12 +1359,18 @@ public class FSDirectory implements Closeable {
     }
     try {
       final HdfsProtos.ZoneEncryptionInfoProto ezProto =
-          HdfsProtos.ZoneEncryptionInfoProto.parseFrom(
-              xattr.getValue());
+          HdfsProtos.ZoneEncryptionInfoProto.parseFrom(xattr.getValue());
       ezManager.unprotectedAddEncryptionZone(inode.getId(),
           PBHelperClient.convert(ezProto.getSuite()),
           PBHelperClient.convert(ezProto.getCryptoProtocolVersion()),
           ezProto.getKeyName());
+      if (ezProto.hasReencryptionProto()) {
+        final ReencryptionInfoProto reProto = ezProto.getReencryptionProto();
+        // inodes parents may not be loaded if this is done during fsimage
+        // loading so cannot set full path now. Pass in null to indicate that.
+        ezManager.getReencryptionStatus()
+            .updateZoneStatus(inode.getId(), null, reProto);
+      }
     } catch (InvalidProtocolBufferException e) {
       NameNode.LOG.warn("Error parsing protocol buffer of " +
           "EZ XAttr " + xattr.getName() + " dir:" + inode.getFullPathName());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 2313335..12d96d8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -89,9 +89,11 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
 import static org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.*;
 
+import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
 import org.apache.hadoop.hdfs.protocol.BlocksStats;
 import org.apache.hadoop.hdfs.protocol.ECBlockGroupsStats;
 import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
+import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
 import org.apache.hadoop.hdfs.server.namenode.metrics.ReplicatedBlocksMBean;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
 import static org.apache.hadoop.util.Time.now;
@@ -199,6 +201,7 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.IllegalECPolicyException;
@@ -1230,6 +1233,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       dir.updateCountForQuota();
       // Enable quota checks.
       dir.enableQuotaChecks();
+      dir.ezManager.startReencryptThreads();
+
       if (haEnabled) {
         // Renew all of the leases before becoming active.
         // This is because, while we were in standby mode,
@@ -1321,6 +1326,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         // so that the tailer starts from the right spot.
         getFSImage().updateLastAppliedTxIdFromWritten();
       }
+      if (dir != null) {
+        dir.ezManager.stopReencryptThread();
+      }
       if (cacheManager != null) {
         cacheManager.stopMonitorThread();
         cacheManager.clearDirectiveStats();
@@ -7031,6 +7039,84 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     }
   }
 
+  void reencryptEncryptionZone(final String zone, final ReencryptAction action,
+      final boolean logRetryCache) throws IOException {
+    boolean success = false;
+    try {
+      Preconditions.checkNotNull(zone, "zone is null.");
+      checkSuperuserPrivilege();
+      checkOperation(OperationCategory.WRITE);
+      checkNameNodeSafeMode("NameNode in safemode, cannot " + action
+          + " re-encryption on zone " + zone);
+      reencryptEncryptionZoneInt(zone, action, logRetryCache);
+      success = true;
+    } finally {
+      logAuditEvent(success, action + "reencryption", zone, null, null);
+    }
+  }
+
+  BatchedListEntries<ZoneReencryptionStatus> listReencryptionStatus(
+      final long prevId) throws IOException {
+    final String operationName = "listReencryptionStatus";
+    boolean success = false;
+    checkSuperuserPrivilege();
+    checkOperation(OperationCategory.READ);
+    readLock();
+    try {
+      checkSuperuserPrivilege();
+      checkOperation(OperationCategory.READ);
+      final BatchedListEntries<ZoneReencryptionStatus> ret =
+          FSDirEncryptionZoneOp.listReencryptionStatus(dir, prevId);
+      success = true;
+      return ret;
+    } finally {
+      readUnlock(operationName);
+      logAuditEvent(success, operationName, null);
+    }
+  }
+
+  private void reencryptEncryptionZoneInt(final String zone,
+      final ReencryptAction action, final boolean logRetryCache)
+      throws IOException {
+    if (getProvider() == null) {
+      throw new IOException("No key provider configured, re-encryption "
+          + "operation is rejected");
+    }
+    FSPermissionChecker pc = getPermissionChecker();
+    // get keyVersionName out of the lock. This keyVersionName will be used
+    // as the target keyVersion for the entire re-encryption.
+    // This means all edek's keyVersion will be compared with this one, and
+    // kms is only contacted if the edek's keyVersion is different.
+    final KeyVersion kv =
+        FSDirEncryptionZoneOp.getLatestKeyVersion(dir, zone, pc);
+    provider.invalidateCache(kv.getName());
+    writeLock();
+    try {
+      checkSuperuserPrivilege();
+      checkOperation(OperationCategory.WRITE);
+      checkNameNodeSafeMode(
+          "NameNode in safemode, cannot " + action + " re-encryption on zone "
+              + zone);
+      switch (action) {
+      case START:
+        FSDirEncryptionZoneOp
+            .reencryptEncryptionZone(dir, zone, kv.getVersionName(),
+                logRetryCache);
+        break;
+      case CANCEL:
+        FSDirEncryptionZoneOp
+            .cancelReencryptEncryptionZone(dir, zone, logRetryCache);
+        break;
+      default:
+        throw new IOException(
+            "Re-encryption action " + action + " is not supported");
+      }
+    } finally {
+      writeUnlock();
+    }
+    getEditLog().logSync();
+  }
+
   /**
    * Set an erasure coding policy on the given path.
    * @param srcArg  The path of the target directory.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 7871202..3fbb7bd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -105,6 +105,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSLimitException;
 import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -116,6 +117,7 @@ import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.protocol.BlocksStats;
+import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
@@ -2052,6 +2054,31 @@ public class NameNodeRpcServer implements NamenodeProtocols {
   }
 
   @Override // ClientProtocol
+  public void reencryptEncryptionZone(final String zone,
+      final ReencryptAction action) throws IOException {
+    checkNNStartup();
+    namesystem.checkOperation(OperationCategory.WRITE);
+    final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return;
+    }
+    boolean success = false;
+    try {
+      namesystem.reencryptEncryptionZone(zone, action, cacheEntry != null);
+      success = true;
+    } finally {
+      RetryCache.setState(cacheEntry, success);
+    }
+  }
+
+  @Override // ClientProtocol
+  public BatchedEntries<ZoneReencryptionStatus> listReencryptionStatus(
+      final long prevId) throws IOException {
+    checkNNStartup();
+    return namesystem.listReencryptionStatus(prevId);
+  }
+
+  @Override // ClientProtocol
   public void setErasureCodingPolicy(String src, String ecPolicyName)
       throws IOException {
     checkNNStartup();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java
new file mode 100644
index 0000000..729b894
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java
@@ -0,0 +1,940 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.fs.XAttr;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.ReencryptionStatus;
+import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
+import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus.State;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.FileEdekInfo;
+import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.ReencryptionTask;
+import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.ZoneSubmissionTracker;
+import org.apache.hadoop.hdfs.util.ReadOnlyList;
+import org.apache.hadoop.ipc.RetriableException;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.StopWatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_BATCH_SIZE_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_BATCH_SIZE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_EDEK_THREADS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_EDEK_THREADS_KEY;
+
+/**
+ * Class for handling re-encrypt EDEK operations.
+ * <p>
+ * For each EZ, ReencryptionHandler walks the tree in a depth-first order,
+ * and submits batches of (files + existing edeks) as re-encryption tasks
+ * to a thread pool. Each thread in the pool then contacts the KMS to
+ * re-encrypt the edeks. ReencryptionUpdater tracks the tasks and updates
+ * file xattrs with the new edeks.
+ * <p>
+ * File renames are disabled in the EZ that's being re-encrypted. Newly created
+ * files will have new edeks, because the edek cache is drained upon the
+ * submission of a re-encryption command.
+ * <p>
+ * It is assumed only 1 ReencryptionHandler will be running, because:
+ *   1. The bottleneck of the entire re-encryption appears to be on the KMS.
+ *   2. Even with multiple handlers, since updater requires writelock and is
+ * single-threaded, the performance gain is limited.
+ * <p>
+ * This class uses the FSDirectory lock for synchronization.
+ */
+@InterfaceAudience.Private
+public class ReencryptionHandler implements Runnable {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(ReencryptionHandler.class);
+
+  // 2000 is based on buffer size = 512 * 1024, and SetXAttr op size is
+  // 100 - 200 bytes (depending on the xattr value).
+  // The buffer size is hard-coded, see outputBufferCapacity from QJM.
+  private static final int MAX_BATCH_SIZE_WITHOUT_FLOODING = 2000;
+
+  private final EncryptionZoneManager ezManager;
+  private final FSDirectory dir;
+  private final long interval;
+  private final int reencryptBatchSize;
+  private double throttleLimitHandlerRatio;
+  private final int reencryptThreadPoolSize;
+  // stopwatches for throttling
+  private final StopWatch throttleTimerAll = new StopWatch();
+  private final StopWatch throttleTimerLocked = new StopWatch();
+
+  private ExecutorCompletionService<ReencryptionTask> batchService;
+  private BlockingQueue<Runnable> taskQueue;
+  // protected by ReencryptionHandler object lock
+  private final Map<Long, ZoneSubmissionTracker> submissions =
+      new ConcurrentHashMap<>();
+
+  // The current batch that the handler is working on. Handler is designed to
+  // be single-threaded, see class javadoc for more details.
+  private ReencryptionBatch currentBatch;
+
+  private final ReencryptionUpdater reencryptionUpdater;
+  private ExecutorService updaterExecutor;
+
+  // Vars for unit tests.
+  private volatile boolean shouldPauseForTesting = false;
+  private volatile int pauseAfterNthSubmission = 0;
+
+  /**
+   * Stop the re-encryption updater thread, as well as all EDEK re-encryption
+   * tasks submitted.
+   */
+  void stopThreads() {
+    assert dir.hasWriteLock();
+    for (ZoneSubmissionTracker zst : submissions.values()) {
+      zst.cancelAllTasks();
+    }
+    if (updaterExecutor != null) {
+      updaterExecutor.shutdownNow();
+    }
+  }
+
+  /**
+   * Start the re-encryption updater thread.
+   */
+  void startUpdaterThread() {
+    updaterExecutor = Executors.newSingleThreadExecutor(
+        new ThreadFactoryBuilder().setDaemon(true)
+            .setNameFormat("reencryptionUpdaterThread #%d").build());
+    updaterExecutor.execute(reencryptionUpdater);
+  }
+
+  @VisibleForTesting
+  synchronized void pauseForTesting() {
+    shouldPauseForTesting = true;
+    LOG.info("Pausing re-encrypt handler for testing.");
+    notify();
+  }
+
+  @VisibleForTesting
+  synchronized void resumeForTesting() {
+    shouldPauseForTesting = false;
+    LOG.info("Resuming re-encrypt handler for testing.");
+    notify();
+  }
+
+  @VisibleForTesting
+  void pauseForTestingAfterNthSubmission(final int count) {
+    assert pauseAfterNthSubmission == 0;
+    pauseAfterNthSubmission = count;
+  }
+
+  @VisibleForTesting
+  void pauseUpdaterForTesting() {
+    reencryptionUpdater.pauseForTesting();
+  }
+
+  @VisibleForTesting
+  void resumeUpdaterForTesting() {
+    reencryptionUpdater.resumeForTesting();
+  }
+
+  @VisibleForTesting
+  void pauseForTestingAfterNthCheckpoint(final long zoneId, final int count) {
+    reencryptionUpdater.pauseForTestingAfterNthCheckpoint(zoneId, count);
+  }
+
+  private synchronized void checkPauseForTesting() throws InterruptedException {
+    assert !dir.hasReadLock();
+    assert !dir.getFSNamesystem().hasReadLock();
+    while (shouldPauseForTesting) {
+      LOG.info("Sleeping in the re-encrypt handler for unit test.");
+      wait();
+      LOG.info("Continuing re-encrypt handler after pausing.");
+    }
+  }
+
+  ReencryptionHandler(final EncryptionZoneManager ezMgr,
+      final Configuration conf) {
+    this.ezManager = ezMgr;
+    Preconditions.checkNotNull(ezManager.getProvider(),
+        "No provider set, cannot re-encrypt");
+    this.dir = ezMgr.getFSDirectory();
+    this.interval =
+        conf.getTimeDuration(DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_KEY,
+            DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_DEFAULT,
+            TimeUnit.MILLISECONDS);
+    Preconditions.checkArgument(interval > 0,
+        DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_KEY + " is not positive.");
+    this.reencryptBatchSize = conf.getInt(DFS_NAMENODE_REENCRYPT_BATCH_SIZE_KEY,
+        DFS_NAMENODE_REENCRYPT_BATCH_SIZE_DEFAULT);
+    Preconditions.checkArgument(reencryptBatchSize > 0,
+        DFS_NAMENODE_REENCRYPT_BATCH_SIZE_KEY + " is not positive.");
+    if (reencryptBatchSize > MAX_BATCH_SIZE_WITHOUT_FLOODING) {
+      LOG.warn("Re-encryption batch size is {}. It could cause edit log buffer "
+          + "to be full and trigger a logSync within the writelock, greatly "
+          + "impacting namenode throughput.", reencryptBatchSize);
+    }
+    this.throttleLimitHandlerRatio =
+        conf.getDouble(DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_KEY,
+            DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_DEFAULT);
+    LOG.info("Configured throttleLimitHandlerRatio={} for re-encryption",
+        throttleLimitHandlerRatio);
+    Preconditions.checkArgument(throttleLimitHandlerRatio > 0.0f,
+        DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_KEY
+            + " is not positive.");
+    this.reencryptThreadPoolSize =
+        conf.getInt(DFS_NAMENODE_REENCRYPT_EDEK_THREADS_KEY,
+            DFS_NAMENODE_REENCRYPT_EDEK_THREADS_DEFAULT);
+
+    taskQueue = new LinkedBlockingQueue<>();
+    ThreadPoolExecutor threadPool =
+        new ThreadPoolExecutor(reencryptThreadPoolSize, reencryptThreadPoolSize,
+            60, TimeUnit.SECONDS, taskQueue, new Daemon.DaemonFactory() {
+              private final AtomicInteger ind = new AtomicInteger(0);
+
+              @Override
+              public Thread newThread(Runnable r) {
+                Thread t = super.newThread(r);
+                t.setName("reencryption edek Thread-" + ind.getAndIncrement());
+                return t;
+              }
+            }, new ThreadPoolExecutor.CallerRunsPolicy() {
+
+              @Override
+              public void rejectedExecution(Runnable runnable,
+                  ThreadPoolExecutor e) {
+                LOG.info("Execution rejected, executing in current thread");
+                super.rejectedExecution(runnable, e);
+              }
+            });
+
+    threadPool.allowCoreThreadTimeOut(true);
+    this.batchService = new ExecutorCompletionService(threadPool);
+    reencryptionUpdater =
+        new ReencryptionUpdater(dir, batchService, this, conf);
+    currentBatch = new ReencryptionBatch(reencryptBatchSize);
+  }
+
+  ReencryptionStatus getReencryptionStatus() {
+    return ezManager.getReencryptionStatus();
+  }
+
+  void cancelZone(final long zoneId, final String zoneName) throws IOException {
+    assert dir.hasWriteLock();
+    final ZoneReencryptionStatus zs =
+        getReencryptionStatus().getZoneStatus(zoneId);
+    if (zs == null || zs.getState() == State.Completed) {
+      throw new IOException("Zone " + zoneName + " is not under re-encryption");
+    }
+    zs.cancel();
+    ZoneSubmissionTracker zst = submissions.get(zoneId);
+    if (zst != null) {
+      zst.cancelAllTasks();
+    }
+  }
+
+  void removeZone(final long zoneId) {
+    assert dir.hasWriteLock();
+    LOG.info("Removing zone {} from re-encryption.", zoneId);
+    ZoneSubmissionTracker zst = submissions.get(zoneId);
+    if (zst != null) {
+      zst.cancelAllTasks();
+    }
+    submissions.remove(zoneId);
+    getReencryptionStatus().removeZone(zoneId);
+  }
+
+  ZoneSubmissionTracker getTracker(final long zoneId) {
+    dir.hasReadLock();
+    return unprotectedGetTracker(zoneId);
+  }
+
+  /**
+   * get the tracker without holding the FSDirectory lock. This is only used for
+   * testing, when updater checks about pausing.
+   */
+  ZoneSubmissionTracker unprotectedGetTracker(final long zoneId) {
+    return submissions.get(zoneId);
+  }
+
+  /**
+   * Add a dummy tracker (with 1 task that has 0 files to re-encrypt)
+   * for the zone. This is necessary to complete the re-encryption in case
+   * no file in the entire zone needs re-encryption at all. We cannot simply
+   * update zone status and set zone xattrs, because in the handler we only hold
+   * readlock, and setting xattrs requires upgrading to a writelock.
+   *
+   * @param zoneId
+   */
+  void addDummyTracker(final long zoneId) {
+    assert dir.hasReadLock();
+    assert !submissions.containsKey(zoneId);
+    final ZoneSubmissionTracker zst = new ZoneSubmissionTracker();
+    zst.setSubmissionDone();
+
+    Future future = batchService.submit(
+        new EDEKReencryptCallable(zoneId, new ReencryptionBatch(), this));
+    zst.addTask(future);
+    submissions.put(zoneId, zst);
+  }
+
+  /**
+   * Main loop. It takes at most 1 zone per scan, and executes until the zone
+   * is completed.
+   * {@see #reencryptEncryptionZoneInt(Long)}.
+   */
+  @Override
+  public void run() {
+    LOG.info("Starting up re-encrypt thread with interval={} millisecond.",
+        interval);
+    while (true) {
+      try {
+        synchronized (this) {
+          wait(interval);
+        }
+        checkPauseForTesting();
+      } catch (InterruptedException ie) {
+        LOG.info("Re-encrypt handler interrupted. Exiting");
+        Thread.currentThread().interrupt();
+        return;
+      }
+
+      final Long zoneId;
+      dir.readLock();
+      try {
+        zoneId = getReencryptionStatus().getNextUnprocessedZone();
+        if (zoneId == null) {
+          // empty queue.
+          continue;
+        }
+        LOG.info("Executing re-encrypt commands on zone {}. Current zones:{}",
+            zoneId, getReencryptionStatus());
+      } finally {
+        dir.readUnlock();
+      }
+
+      try {
+        reencryptEncryptionZone(zoneId);
+      } catch (RetriableException | SafeModeException re) {
+        LOG.info("Re-encryption caught exception, will retry", re);
+        getReencryptionStatus().markZoneForRetry(zoneId);
+      } catch (IOException ioe) {
+        LOG.warn("IOException caught when re-encrypting zone {}", zoneId, ioe);
+      } catch (InterruptedException ie) {
+        LOG.info("Re-encrypt handler interrupted. Exiting.");
+        Thread.currentThread().interrupt();
+        return;
+      } catch (Throwable t) {
+        LOG.error("Re-encrypt handler thread exiting. Exception caught when"
+            + " re-encrypting zone {}.", zoneId, t);
+        return;
+      }
+    }
+  }
+
+  /**
+   * Re-encrypts a zone by recursively iterating all paths inside the zone,
+   * in lexicographic order.
+   * Files are re-encrypted, and subdirs are processed during iteration.
+   *
+   * @param zoneId the Zone's id.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  void reencryptEncryptionZone(final long zoneId)
+      throws IOException, InterruptedException {
+    throttleTimerAll.reset().start();
+    throttleTimerLocked.reset();
+    final INode zoneNode;
+    final ZoneReencryptionStatus zs;
+
+    readLock();
+    try {
+      getReencryptionStatus().markZoneStarted(zoneId);
+      zoneNode = dir.getInode(zoneId);
+      // start re-encrypting the zone from the beginning
+      if (zoneNode == null) {
+        LOG.info("Directory with id {} removed during re-encrypt, skipping",
+            zoneId);
+        return;
+      }
+      if (!zoneNode.isDirectory()) {
+        LOG.info("Cannot re-encrypt directory with id {} because it's not a"
+            + " directory.", zoneId);
+        return;
+      }
+
+      zs = getReencryptionStatus().getZoneStatus(zoneId);
+      assert zs != null;
+      // Only costly log FullPathName here once, and use id elsewhere.
+      LOG.info("Re-encrypting zone {}(id={})", zoneNode.getFullPathName(),
+          zoneId);
+      if (zs.getLastCheckpointFile() == null) {
+        // new re-encryption
+        reencryptDir(zoneNode.asDirectory(), zoneId, HdfsFileStatus.EMPTY_NAME,
+            zs.getEzKeyVersionName());
+      } else {
+        // resuming from a past re-encryption
+        restoreFromLastProcessedFile(zoneId, zs);
+      }
+      // save the last batch and mark complete
+      submitCurrentBatch(zoneId);
+      LOG.info("Submission completed of zone {} for re-encryption.", zoneId);
+      reencryptionUpdater.markZoneSubmissionDone(zoneId);
+    } finally {
+      readUnlock();
+    }
+  }
+
+  List<XAttr> completeReencryption(final INode zoneNode) throws IOException {
+    assert dir.hasWriteLock();
+    assert dir.getFSNamesystem().hasWriteLock();
+    final Long zoneId = zoneNode.getId();
+    ZoneReencryptionStatus zs = getReencryptionStatus().getZoneStatus(zoneId);
+    assert zs != null;
+    LOG.info("Re-encryption completed on zone {}. Re-encrypted {} files,"
+            + " failures encountered: {}.", zoneNode.getFullPathName(),
+        zs.getFilesReencrypted(), zs.getNumReencryptionFailures());
+    // This also removes the zone from reencryptionStatus
+    submissions.remove(zoneId);
+    return FSDirEncryptionZoneOp
+        .updateReencryptionFinish(dir, INodesInPath.fromINode(zoneNode), zs);
+  }
+
+  /**
+   * Restore the re-encryption from the progress inside ReencryptionStatus.
+   * This means start from exactly the lastProcessedFile (LPF), skipping all
+   * earlier paths in lexicographic order. Lexicographically-later directories
+   * on the LPF parent paths are added to subdirs.
+   */
+  private void restoreFromLastProcessedFile(final long zoneId,
+      final ZoneReencryptionStatus zs)
+      throws IOException, InterruptedException {
+    final INodeDirectory parent;
+    final byte[] startAfter;
+    final INodesInPath lpfIIP =
+        dir.getINodesInPath(zs.getLastCheckpointFile(), FSDirectory.DirOp.READ);
+    parent = lpfIIP.getLastINode().getParent();
+    startAfter = lpfIIP.getLastINode().getLocalNameBytes();
+    reencryptDir(parent, zoneId, startAfter, zs.getEzKeyVersionName());
+  }
+
+  /**
+   * Iterate through all files directly inside parent, and recurse down
+   * directories. The listing is done in batch, and can optionally start after
+   * a position.
+   * <p>
+   * Each batch is then send to the threadpool, where KMS will be contacted and
+   * edek re-encrypted. {@link ReencryptionUpdater} handles the tasks completed
+   * from the threadpool.
+   * <p>
+   * The iteration of the inode tree is done in a depth-first fashion. But
+   * instead of holding all INodeDirectory's in memory on the fly, only the
+   * path components to the current inode is held. This is to reduce memory
+   * consumption.
+   *
+   * @param parent     The inode id of parent directory
+   * @param zoneId     Id of the EZ inode
+   * @param startAfter Full path of a file the re-encrypt should start after.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private void reencryptDir(final INodeDirectory parent, final long zoneId,
+      byte[] startAfter, final String ezKeyVerName)
+      throws IOException, InterruptedException {
+    List<byte[]> startAfters = new ArrayList<>();
+    if (parent == null) {
+      return;
+    }
+    INode curr = parent;
+    // construct startAfters all the way up to the zone inode.
+    startAfters.add(startAfter);
+    while (curr.getId() != zoneId) {
+      startAfters.add(0, curr.getLocalNameBytes());
+      curr = curr.getParent();
+    }
+    curr = reencryptDirInt(zoneId, parent, startAfters, ezKeyVerName);
+    while (!startAfters.isEmpty()) {
+      if (curr == null) {
+        // lock was reacquired, re-resolve path.
+        curr = resolvePaths(zoneId, startAfters);
+      }
+      curr = reencryptDirInt(zoneId, curr, startAfters, ezKeyVerName);
+    }
+  }
+
+  /**
+   * Resolve the cursor of re-encryption to an inode.
+   * <p>
+   * The parent of the lowest level startAfter is returned. If somewhere in the
+   * middle of startAfters changed, the parent of the lowest unchanged level is
+   * returned.
+   *
+   * @param zoneId      Id of the EZ inode.
+   * @param startAfters the cursor, represented by a list of path bytes.
+   * @return the parent inode corresponding to the startAfters, or null if
+   * the EZ node (furthest parent) is deleted.
+   */
+  private INode resolvePaths(final long zoneId, List<byte[]> startAfters)
+      throws IOException {
+    // If the readlock was reacquired, we need to resolve the paths again
+    // in case things have changed. If our cursor file/dir is changed,
+    // continue from the next one.
+    INode zoneNode = dir.getInode(zoneId);
+    if (zoneNode == null) {
+      throw new FileNotFoundException("Zone " + zoneId + " is deleted.");
+    }
+    INodeDirectory parent = zoneNode.asDirectory();
+    for (int i = 0; i < startAfters.size(); ++i) {
+      if (i == startAfters.size() - 1) {
+        // last startAfter does not need to be resolved, since search for
+        // nextChild will cover that automatically.
+        break;
+      }
+      INode curr =
+          parent.getChild(startAfters.get(i), Snapshot.CURRENT_STATE_ID);
+      if (curr == null) {
+        // inode at this level has changed. Update startAfters to point to
+        // the next dir at the parent level (and dropping any startAfters
+        // at lower levels).
+        for (; i < startAfters.size(); ++i) {
+          startAfters.remove(startAfters.size() - 1);
+        }
+        break;
+      }
+      parent = curr.asDirectory();
+    }
+    return parent;
+  }
+
+  /**
+   * Submit the current batch to the thread pool.
+   *
+   * @param zoneId Id of the EZ INode
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private void submitCurrentBatch(final long zoneId)
+      throws IOException, InterruptedException {
+    assert dir.hasReadLock();
+    if (currentBatch.isEmpty()) {
+      return;
+    }
+    ZoneSubmissionTracker zst = submissions.get(zoneId);
+    if (zst == null) {
+      zst = new ZoneSubmissionTracker();
+      submissions.put(zoneId, zst);
+    }
+    Future future = batchService
+        .submit(new EDEKReencryptCallable(zoneId, currentBatch, this));
+    zst.addTask(future);
+    LOG.info("Submitted batch (start:{}, size:{}) of zone {} to re-encrypt.",
+        currentBatch.getFirstFilePath(), currentBatch.size(), zoneId);
+    currentBatch = new ReencryptionBatch(reencryptBatchSize);
+    // flip the pause flag if this is nth submission.
+    // The actual pause need to happen outside of the lock.
+    if (pauseAfterNthSubmission > 0) {
+      if (--pauseAfterNthSubmission == 0) {
+        shouldPauseForTesting = true;
+      }
+    }
+  }
+
+  final class ReencryptionBatch {
+    // First file's path, for logging purpose.
+    private String firstFilePath;
+    private final List<FileEdekInfo> batch;
+
+    ReencryptionBatch() {
+      this(reencryptBatchSize);
+    }
+
+    ReencryptionBatch(int initialCapacity) {
+      batch = new ArrayList<>(initialCapacity);
+    }
+
+    void add(final INodeFile inode) throws IOException {
+      assert dir.hasReadLock();
+      Preconditions.checkNotNull(inode, "INodeFile is null");
+      if (batch.isEmpty()) {
+        firstFilePath = inode.getFullPathName();
+      }
+      batch.add(new FileEdekInfo(dir, inode));
+    }
+
+    String getFirstFilePath() {
+      return firstFilePath;
+    }
+
+    boolean isEmpty() {
+      return batch.isEmpty();
+    }
+
+    int size() {
+      return batch.size();
+    }
+
+    void clear() {
+      batch.clear();
+    }
+
+    List<FileEdekInfo> getBatch() {
+      return batch;
+    }
+  }
+
+  /**
+   * Simply contacts the KMS for re-encryption. No NN locks held.
+   */
+  private static class EDEKReencryptCallable
+      implements Callable<ReencryptionTask> {
+    private final long zoneNodeId;
+    private final ReencryptionBatch batch;
+    private final ReencryptionHandler handler;
+
+    EDEKReencryptCallable(final long zoneId,
+        final ReencryptionBatch currentBatch, final ReencryptionHandler rh) {
+      zoneNodeId = zoneId;
+      batch = currentBatch;
+      handler = rh;
+    }
+
+    @Override
+    public ReencryptionTask call() {
+      LOG.info("Processing batched re-encryption for zone {}, batch size {},"
+          + " start:{}", zoneNodeId, batch.size(), batch.getFirstFilePath());
+      if (batch.isEmpty()) {
+        return new ReencryptionTask(zoneNodeId, 0, batch);
+      }
+      final Stopwatch kmsSW = new Stopwatch().start();
+
+      int numFailures = 0;
+      String result = "Completed";
+      if (!reencryptEdeks()) {
+        numFailures += batch.size();
+        result = "Failed to";
+      }
+      LOG.info("{} re-encrypting one batch of {} edeks from KMS,"
+              + " time consumed: {}, start: {}.", result,
+          batch.size(), kmsSW.stop(), batch.getFirstFilePath());
+      return new ReencryptionTask(zoneNodeId, numFailures, batch);
+    }
+
+    private boolean reencryptEdeks() {
+      // communicate with the kms out of lock
+      final List<EncryptedKeyVersion> edeks = new ArrayList<>(batch.size());
+      for (FileEdekInfo entry : batch.getBatch()) {
+        edeks.add(entry.getExistingEdek());
+      }
+      // provider already has LoadBalancingKMSClientProvider's reties. It that
+      // fails, just fail this callable.
+      try {
+        handler.ezManager.getProvider().reencryptEncryptedKeys(edeks);
+        EncryptionFaultInjector.getInstance().reencryptEncryptedKeys();
+      } catch (GeneralSecurityException | IOException ex) {
+        LOG.warn("Failed to re-encrypt one batch of {} edeks, start:{}",
+            batch.size(), batch.getFirstFilePath(), ex);
+        return false;
+      }
+      int i = 0;
+      for (FileEdekInfo entry : batch.getBatch()) {
+        assert i < edeks.size();
+        entry.setEdek(edeks.get(i++));
+      }
+      return true;
+    }
+  }
+
+  /**
+   * Iterates the parent directory, and add direct children files to
+   * current batch. If batch size meets configured threshold, a Callable
+   * is created and sent to the thread pool, which will communicate to the KMS
+   * to get new edeks.
+   * <p>
+   * Locks could be released and reacquired when a Callable is created.
+   *
+   * @param zoneId Id of the EZ INode
+   * @return The inode which was just processed, if lock is held in the entire
+   * process. Null if lock is released.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private INode reencryptDirInt(final long zoneId, INode curr,
+      List<byte[]> startAfters, final String ezKeyVerName)
+      throws IOException, InterruptedException {
+    assert dir.hasReadLock();
+    assert dir.getFSNamesystem().hasReadLock();
+    Preconditions.checkNotNull(curr, "Current inode can't be null");
+    checkZoneReady(zoneId);
+    final INodeDirectory parent =
+        curr.isDirectory() ? curr.asDirectory() : curr.getParent();
+    ReadOnlyList<INode> children =
+        parent.getChildrenList(Snapshot.CURRENT_STATE_ID);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Re-encrypting directory {}", parent.getFullPathName());
+    }
+
+    final byte[] startAfter = startAfters.get(startAfters.size() - 1);
+    boolean lockReleased = false;
+    for (int i = INodeDirectory.nextChild(children, startAfter);
+         i < children.size(); ++i) {
+      final INode inode = children.get(i);
+      if (!reencryptINode(inode, ezKeyVerName)) {
+        // inode wasn't added for re-encryption. Recurse down if it's a dir,
+        // skip otherwise.
+        if (!inode.isDirectory()) {
+          continue;
+        }
+        if (ezManager.isEncryptionZoneRoot(inode, inode.getFullPathName())) {
+          // nested EZ, ignore.
+          LOG.info("{}({}) is a nested EZ, skipping for re-encryption",
+              inode.getFullPathName(), inode.getId());
+          continue;
+        }
+        // add 1 level to the depth-first search.
+        curr = inode;
+        if (!startAfters.isEmpty()) {
+          startAfters.remove(startAfters.size() - 1);
+          startAfters.add(curr.getLocalNameBytes());
+        }
+        startAfters.add(HdfsFileStatus.EMPTY_NAME);
+        return lockReleased ? null : curr;
+      }
+      if (currentBatch.size() >= reencryptBatchSize) {
+        final byte[] currentStartAfter = inode.getLocalNameBytes();
+        final String parentPath = parent.getFullPathName();
+        submitCurrentBatch(zoneId);
+        lockReleased = true;
+        readUnlock();
+        try {
+          throttle();
+          checkPauseForTesting();
+        } finally {
+          readLock();
+        }
+        checkZoneReady(zoneId);
+
+        // Things could have changed when the lock was released.
+        // Re-resolve the parent inode.
+        FSPermissionChecker pc = dir.getPermissionChecker();
+        INode newParent =
+            dir.resolvePath(pc, parentPath, FSDirectory.DirOp.READ)
+                .getLastINode();
+        if (newParent == null || !newParent.equals(parent)) {
+          // parent dir is deleted or recreated. We're done.
+          return null;
+        }
+        children = parent.getChildrenList(Snapshot.CURRENT_STATE_ID);
+        // -1 to counter the ++ on the for loop
+        i = INodeDirectory.nextChild(children, currentStartAfter) - 1;
+      }
+    }
+    // Successfully finished this dir, adjust pointers to 1 level up, and
+    // startAfter this dir.
+    startAfters.remove(startAfters.size() - 1);
+    if (!startAfters.isEmpty()) {
+      startAfters.remove(startAfters.size() - 1);
+      startAfters.add(curr.getLocalNameBytes());
+    }
+    curr = curr.getParent();
+    return lockReleased ? null : curr;
+  }
+
+  private void readLock() {
+    dir.getFSNamesystem().readLock();
+    dir.readLock();
+    throttleTimerLocked.start();
+  }
+
+  private void readUnlock() {
+    dir.readUnlock();
+    dir.getFSNamesystem().readUnlock("reencryptHandler");
+    throttleTimerLocked.stop();
+  }
+
+  /**
+   * Throttles the ReencryptionHandler in 3 aspects:
+   * 1. Prevents generating more Callables than the CPU could possibly handle.
+   * 2. Prevents generating more Callables than the ReencryptionUpdater can
+   *   handle, under its own throttling
+   * 3. Prevents contending FSN/FSD read locks. This is done based on the
+   *   DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_RATIO_KEY configuration.
+   * <p>
+   * Item 1 and 2 are to control NN heap usage.
+   *
+   * @throws InterruptedException
+   */
+  @VisibleForTesting
+  void throttle() throws InterruptedException {
+    // 1.
+    final int numCores = Runtime.getRuntime().availableProcessors();
+    if (taskQueue.size() >= numCores) {
+      LOG.debug("Re-encryption handler throttling because queue size {} is"
+          + "larger than number of cores {}", taskQueue.size(), numCores);
+      while (taskQueue.size() >= numCores) {
+        Thread.sleep(100);
+      }
+    }
+
+    // 2. if tasks are piling up on the updater, don't create new callables
+    // until the queue size goes down.
+    final int maxTasksPiled = Runtime.getRuntime().availableProcessors() * 2;
+    int totalTasks = 0;
+    for (ZoneSubmissionTracker zst : submissions.values()) {
+      totalTasks += zst.getTasks().size();
+    }
+    if (totalTasks >= maxTasksPiled) {
+      LOG.debug("Re-encryption handler throttling because total tasks pending"
+          + " re-encryption updater is {}", totalTasks);
+      while (totalTasks >= maxTasksPiled) {
+        Thread.sleep(500);
+        totalTasks = 0;
+        for (ZoneSubmissionTracker zst : submissions.values()) {
+          totalTasks += zst.getTasks().size();
+        }
+      }
+    }
+
+    // 3.
+    if (throttleLimitHandlerRatio >= 1.0) {
+      return;
+    }
+    final long expect = (long) (throttleTimerAll.now(TimeUnit.MILLISECONDS)
+        * throttleLimitHandlerRatio);
+    final long actual = throttleTimerLocked.now(TimeUnit.MILLISECONDS);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Re-encryption handler throttling expect: {}, actual: {},"
+              + " throttleTimerAll:{}", expect, actual,
+          throttleTimerAll.now(TimeUnit.MILLISECONDS));
+    }
+    if (expect - actual < 0) {
+      // in case throttleLimitHandlerRatio is very small, expect will be 0.
+      // so sleepMs should not be calculated from expect, to really meet the
+      // ratio. e.g. if ratio is 0.001, expect = 0 and actual = 1, sleepMs
+      // should be 1000 - throttleTimerAll.now()
+      final long sleepMs =
+          (long) (actual / throttleLimitHandlerRatio) - throttleTimerAll
+              .now(TimeUnit.MILLISECONDS);
+      LOG.debug("Throttling re-encryption, sleeping for {} ms", sleepMs);
+      Thread.sleep(sleepMs);
+    }
+    throttleTimerAll.reset().start();
+    throttleTimerLocked.reset();
+  }
+
+  /**
+   * Process an Inode for re-encryption. Add to current batch if it's a file,
+   * no-op otherwise.
+   *
+   * @param inode the inode
+   * @return true if inode is added to currentBatch and should be re-encrypted.
+   * false otherwise: could be inode is not a file, or inode's edek's
+   * key version is not changed.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private boolean reencryptINode(final INode inode, final String ezKeyVerName)
+      throws IOException, InterruptedException {
+    dir.hasReadLock();
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Processing {} for re-encryption", inode.getFullPathName());
+    }
+    if (!inode.isFile()) {
+      return false;
+    }
+    FileEncryptionInfo feInfo = FSDirEncryptionZoneOp
+        .getFileEncryptionInfo(dir, INodesInPath.fromINode(inode));
+    if (feInfo == null) {
+      LOG.warn("File {} skipped re-encryption because it is not encrypted! "
+          + "This is very likely a bug.", inode.getId());
+      return false;
+    }
+    if (ezKeyVerName.equals(feInfo.getEzKeyVersionName())) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("File {} skipped re-encryption because edek's key version"
+            + " name is not changed.", inode.getFullPathName());
+      }
+      return false;
+    }
+    currentBatch.add(inode.asFile());
+    return true;
+  }
+
+  /**
+   * Check whether zone is ready for re-encryption. Throws IOE if it's not.
+   * 1. If EZ is deleted.
+   * 2. if the re-encryption is canceled.
+   * 3. If NN is not active or is in safe mode.
+   *
+   * @throws IOException if zone does not exist / is cancelled, or if NN is not
+   *                     ready for write.
+   */
+  void checkZoneReady(final long zoneId)
+      throws RetriableException, SafeModeException, IOException {
+    final ZoneReencryptionStatus zs =
+        getReencryptionStatus().getZoneStatus(zoneId);
+    if (zs == null) {
+      throw new IOException("Zone " + zoneId + " status cannot be found.");
+    }
+    if (zs.isCanceled()) {
+      throw new IOException("Re-encryption is canceled for zone " + zoneId);
+    }
+    dir.getFSNamesystem()
+        .checkNameNodeSafeMode("NN is in safe mode, cannot re-encrypt.");
+    // re-encryption should be cancelled when NN goes to standby. Just
+    // double checking for sanity.
+    dir.getFSNamesystem().checkOperation(NameNode.OperationCategory.WRITE);
+  }
+
+  /**
+   * Called when a new zone is submitted for re-encryption. This will interrupt
+   * the background thread if it's waiting for the next
+   * DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_KEY.
+   */
+  synchronized void notifyNewSubmission() {
+    LOG.debug("Notifying handler for new re-encryption command.");
+    this.notify();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java
new file mode 100644
index 0000000..690a0e9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java
@@ -0,0 +1,523 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.fs.XAttr;
+import org.apache.hadoop.fs.XAttrSetFlag;
+import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
+import org.apache.hadoop.hdfs.server.namenode.ReencryptionHandler.ReencryptionBatch;
+import org.apache.hadoop.ipc.RetriableException;
+import org.apache.hadoop.util.StopWatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_UPDATER_RATIO_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_UPDATER_RATIO_KEY;
+
+/**
+ * Class for finalizing re-encrypt EDEK operations, by updating file xattrs with
+ * edeks returned from reencryption.
+ * <p>
+ * The tasks are submitted by ReencryptionHandler.
+ * <p>
+ * It is assumed only 1 Updater will be running, since updating file xattrs
+ * requires namespace write lock, and performance gain from multi-threading
+ * is limited.
+ */
+@InterfaceAudience.Private
+public final class ReencryptionUpdater implements Runnable {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(ReencryptionUpdater.class);
+
+  private volatile boolean shouldPauseForTesting = false;
+  private volatile int pauseAfterNthCheckpoint = 0;
+  private volatile long pauseZoneId = 0;
+
+  private double throttleLimitRatio;
+  private final StopWatch throttleTimerAll = new StopWatch();
+  private final StopWatch throttleTimerLocked = new StopWatch();
+
+  private volatile long faultRetryInterval = 60000;
+
+  /**
+   * Class to track re-encryption submissions of a single zone. It contains
+   * all the submitted futures, and statistics about how far the futures are
+   * processed.
+   */
+  static final class ZoneSubmissionTracker {
+    private boolean submissionDone;
+    private LinkedList<Future> tasks;
+    private int numCheckpointed;
+    private int numFutureDone;
+
+    ZoneSubmissionTracker() {
+      submissionDone = false;
+      tasks = new LinkedList<>();
+      numCheckpointed = 0;
+      numFutureDone = 0;
+    }
+
+    LinkedList<Future> getTasks() {
+      return tasks;
+    }
+
+    void cancelAllTasks() {
+      if (!tasks.isEmpty()) {
+        LOG.info("Cancelling {} re-encryption tasks", tasks.size());
+        for (Future f : tasks) {
+          f.cancel(true);
+        }
+      }
+    }
+
+    void addTask(final Future task) {
+      tasks.add(task);
+    }
+
+    private boolean isCompleted() {
+      return submissionDone && tasks.isEmpty();
+    }
+
+    void setSubmissionDone() {
+      submissionDone = true;
+    }
+  }
+
+  /**
+   * Class representing the task for one batch of a re-encryption command. It
+   * also contains statistics about how far this single batch has been executed.
+   */
+  static final class ReencryptionTask {
+    private final long zoneId;
+    private boolean processed = false;
+    private int numFilesUpdated = 0;
+    private int numFailures = 0;
+    private String lastFile = null;
+    private final ReencryptionBatch batch;
+
+    ReencryptionTask(final long id, final int failures,
+        final ReencryptionBatch theBatch) {
+      zoneId = id;
+      numFailures = failures;
+      batch = theBatch;
+    }
+  }
+
+  /**
+   * Class that encapsulates re-encryption details of a file. It contains the
+   * file inode, stores the initial edek of the file, and the new edek
+   * after re-encryption.
+   * <p>
+   * Assumptions are the object initialization happens when dir lock is held,
+   * and inode is valid and is encrypted during initialization.
+   * <p>
+   * Namespace changes may happen during re-encryption, and if inode is changed
+   * the re-encryption is skipped.
+   */
+  static final class FileEdekInfo {
+    private final long inodeId;
+    private final EncryptedKeyVersion existingEdek;
+    private EncryptedKeyVersion edek = null;
+
+    FileEdekInfo(FSDirectory dir, INodeFile inode) throws IOException {
+      assert dir.hasReadLock();
+      Preconditions.checkNotNull(inode, "INodeFile is null");
+      inodeId = inode.getId();
+      final FileEncryptionInfo fei = FSDirEncryptionZoneOp
+          .getFileEncryptionInfo(dir, INodesInPath.fromINode(inode));
+      Preconditions.checkNotNull(fei,
+          "FileEncryptionInfo is null for " + inodeId);
+      existingEdek = EncryptedKeyVersion
+          .createForDecryption(fei.getKeyName(), fei.getEzKeyVersionName(),
+              fei.getIV(), fei.getEncryptedDataEncryptionKey());
+    }
+
+    long getInodeId() {
+      return inodeId;
+    }
+
+    EncryptedKeyVersion getExistingEdek() {
+      return existingEdek;
+    }
+
+    void setEdek(final EncryptedKeyVersion ekv) {
+      assert ekv != null;
+      edek = ekv;
+    }
+  }
+
+  @VisibleForTesting
+  synchronized void pauseForTesting() {
+    shouldPauseForTesting = true;
+    LOG.info("Pausing re-encrypt updater for testing.");
+    notify();
+  }
+
+  @VisibleForTesting
+  synchronized void resumeForTesting() {
+    shouldPauseForTesting = false;
+    LOG.info("Resuming re-encrypt updater for testing.");
+    notify();
+  }
+
+  @VisibleForTesting
+  void pauseForTestingAfterNthCheckpoint(final long zoneId, final int count) {
+    assert pauseAfterNthCheckpoint == 0;
+    pauseAfterNthCheckpoint = count;
+    pauseZoneId = zoneId;
+  }
+
+  private final FSDirectory dir;
+  private final CompletionService<ReencryptionTask> batchService;
+  private final ReencryptionHandler handler;
+
+  ReencryptionUpdater(final FSDirectory fsd,
+      final CompletionService<ReencryptionTask> service,
+      final ReencryptionHandler rh, final Configuration conf) {
+    dir = fsd;
+    batchService = service;
+    handler = rh;
+    this.throttleLimitRatio =
+        conf.getDouble(DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_UPDATER_RATIO_KEY,
+            DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_UPDATER_RATIO_DEFAULT);
+    Preconditions.checkArgument(throttleLimitRatio > 0.0f,
+        DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_UPDATER_RATIO_KEY
+            + " is not positive.");
+  }
+
+  /**
+   * Called by the submission thread to indicate all tasks have been submitted.
+   * If this is called but no tasks has been submitted, the re-encryption is
+   * considered complete.
+   *
+   * @param zoneId Id of the zone inode.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  void markZoneSubmissionDone(final long zoneId)
+      throws IOException, InterruptedException {
+    final ZoneSubmissionTracker tracker = handler.getTracker(zoneId);
+    if (tracker != null) {
+      tracker.submissionDone = true;
+    } else {
+      // Caller thinks submission is done, but no tasks submitted - meaning
+      // no files in the EZ need to be re-encrypted. Complete directly.
+      handler.addDummyTracker(zoneId);
+    }
+  }
+
+  @Override
+  public void run() {
+    throttleTimerAll.start();
+    while (true) {
+      try {
+        // Assuming single-threaded updater.
+        takeAndProcessTasks();
+      } catch (InterruptedException ie) {
+        LOG.warn("Re-encryption updater thread interrupted. Exiting.");
+        Thread.currentThread().interrupt();
+        return;
+      } catch (IOException ioe) {
+        LOG.warn("Re-encryption updater thread exception.", ioe);
+      } catch (Throwable t) {
+        LOG.error("Re-encryption updater thread exiting.", t);
+        return;
+      }
+    }
+  }
+
+  /**
+   * Process a completed ReencryptionTask. Each inode id is resolved to an INode
+   * object, skip if the inode is deleted.
+   * <p>
+   * Only file xattr is updated by this method. Re-encryption progress is not
+   * updated.
+   *
+   * @param zoneNodePath full path of the EZ inode.
+   * @param task     the completed task.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private void processTaskEntries(final String zoneNodePath,
+      final ReencryptionTask task) throws IOException, InterruptedException {
+    assert dir.hasWriteLock();
+    if (!task.batch.isEmpty() && task.numFailures == 0) {
+      LOG.debug(
+          "Updating file xattrs for re-encrypting zone {}," + " starting at {}",
+          zoneNodePath, task.batch.getFirstFilePath());
+      for (Iterator<FileEdekInfo> it = task.batch.getBatch().iterator();
+           it.hasNext();) {
+        FileEdekInfo entry = it.next();
+        // resolve the inode again, and skip if it's doesn't exist
+        LOG.trace("Updating {} for re-encryption.", entry.getInodeId());
+        final INode inode = dir.getInode(entry.getInodeId());
+        if (inode == null) {
+          LOG.debug("INode {} doesn't exist, skipping re-encrypt.",
+              entry.getInodeId());
+          // also remove from batch so later it's not saved.
+          it.remove();
+          continue;
+        }
+
+        // Cautiously check file encryption info, and only update if we're sure
+        // it's still using the same edek.
+        Preconditions.checkNotNull(entry.edek);
+        final FileEncryptionInfo fei = FSDirEncryptionZoneOp
+            .getFileEncryptionInfo(dir, INodesInPath.fromINode(inode));
+        if (!fei.getKeyName().equals(entry.edek.getEncryptionKeyName())) {
+          LOG.debug("Inode {} EZ key changed, skipping re-encryption.",
+              entry.getInodeId());
+          it.remove();
+          continue;
+        }
+        if (fei.getEzKeyVersionName()
+            .equals(entry.edek.getEncryptionKeyVersionName())) {
+          LOG.debug(
+              "Inode {} EZ key version unchanged, skipping re-encryption.",
+              entry.getInodeId());
+          it.remove();
+          continue;
+        }
+        if (!Arrays.equals(fei.getEncryptedDataEncryptionKey(),
+            entry.existingEdek.getEncryptedKeyVersion().getMaterial())) {
+          LOG.debug("Inode {} existing edek changed, skipping re-encryption",
+              entry.getInodeId());
+          it.remove();
+          continue;
+        }
+        FileEncryptionInfo newFei = new FileEncryptionInfo(fei.getCipherSuite(),
+            fei.getCryptoProtocolVersion(),
+            entry.edek.getEncryptedKeyVersion().getMaterial(),
+            entry.edek.getEncryptedKeyIv(), fei.getKeyName(),
+            entry.edek.getEncryptionKeyVersionName());
+        final INodesInPath iip = INodesInPath.fromINode(inode);
+        FSDirEncryptionZoneOp
+            .setFileEncryptionInfo(dir, iip, newFei, XAttrSetFlag.REPLACE);
+        task.lastFile = iip.getPath();
+        ++task.numFilesUpdated;
+      }
+
+      LOG.info("Updated xattrs on {}({}) files in zone {} for re-encryption,"
+              + " starting:{}.", task.numFilesUpdated, task.batch.size(),
+          zoneNodePath, task.batch.getFirstFilePath());
+    }
+    task.processed = true;
+  }
+
+  /**
+   * Iterate tasks for the given zone, and update progress accordingly. The
+   * checkpoint indicates all files before it are done re-encryption, so it will
+   * be updated to the position where all tasks before are completed.
+   *
+   * @param zoneNode the EZ inode.
+   * @param tracker  the zone submission tracker.
+   * @return the list containing the last checkpointed xattr. Empty if
+   *   no checkpoint happened.
+   * @throws ExecutionException
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private List<XAttr> processCheckpoints(final INode zoneNode,
+      final ZoneSubmissionTracker tracker)
+      throws ExecutionException, IOException, InterruptedException {
+    assert dir.hasWriteLock();
+    final long zoneId = zoneNode.getId();
+    final String zonePath = zoneNode.getFullPathName();
+    final ZoneReencryptionStatus status =
+        handler.getReencryptionStatus().getZoneStatus(zoneId);
+    assert status != null;
+    // always start from the beginning, because the checkpoint means all files
+    // before it are re-encrypted.
+    final LinkedList<Future> tasks = tracker.getTasks();
+    final List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
+    ListIterator<Future> iter = tasks.listIterator();
+    while (iter.hasNext()) {
+      Future<ReencryptionTask> curr = iter.next();
+      if (!curr.isDone() || !curr.get().processed) {
+        // still has earlier tasks not completed, skip here.
+        break;
+      }
+      ReencryptionTask task = curr.get();
+      LOG.debug("Updating re-encryption checkpoint with completed task."
+          + " last: {} size:{}.", task.lastFile, task.batch.size());
+      assert zoneId == task.zoneId;
+      try {
+        final XAttr xattr = FSDirEncryptionZoneOp
+            .updateReencryptionProgress(dir, zoneNode, status, task.lastFile,
+                task.numFilesUpdated, task.numFailures);
+        xAttrs.clear();
+        xAttrs.add(xattr);
+      } catch (IOException ie) {
+        LOG.warn("Failed to update re-encrypted progress to xattr for zone {}",
+            zonePath, ie);
+        ++task.numFailures;
+      }
+      ++tracker.numCheckpointed;
+      iter.remove();
+    }
+    if (tracker.isCompleted()) {
+      LOG.debug("Removed re-encryption tracker for zone {} because it completed"
+              + " with {} tasks.", zonePath, tracker.numCheckpointed);
+      return handler.completeReencryption(zoneNode);
+    }
+    return xAttrs;
+  }
+
+  private void takeAndProcessTasks() throws Exception {
+    final Future<ReencryptionTask> completed = batchService.take();
+    throttle();
+    checkPauseForTesting();
+    ReencryptionTask task = completed.get();
+    if (completed.isCancelled()) {
+      LOG.debug("Skipped canceled re-encryption task for zone {}, last: {}",
+          task.zoneId, task.lastFile);
+    }
+
+    boolean shouldRetry;
+    do {
+      dir.getFSNamesystem().writeLock();
+      try {
+        throttleTimerLocked.start();
+        processTask(task);
+        shouldRetry = false;
+      } catch (RetriableException | SafeModeException re) {
+        // Keep retrying until succeed.
+        LOG.info("Exception when processing re-encryption task for zone {}, "
+                + "retrying...", task.zoneId, re);
+        shouldRetry = true;
+        Thread.sleep(faultRetryInterval);
+      } catch (IOException ioe) {
+        LOG.warn("Failure processing re-encryption task for zone {}",
+            task.zoneId, ioe);
+        ++task.numFailures;
+        task.processed = true;
+        shouldRetry = false;
+      } finally {
+        dir.getFSNamesystem().writeUnlock("reencryptUpdater");
+        throttleTimerLocked.stop();
+      }
+      // logSync regardless, to prevent edit log buffer overflow triggering
+      // logSync inside FSN writelock.
+      dir.getEditLog().logSync();
+    } while (shouldRetry);
+  }
+
+  private void processTask(ReencryptionTask task)
+      throws InterruptedException, ExecutionException, IOException {
+    final List<XAttr> xAttrs;
+    final String zonePath;
+    dir.writeLock();
+    try {
+      handler.checkZoneReady(task.zoneId);
+      final INode zoneNode = dir.getInode(task.zoneId);
+      if (zoneNode == null) {
+        // ez removed.
+        return;
+      }
+      zonePath = zoneNode.getFullPathName();
+      LOG.info("Processing returned re-encryption task for zone {}({}), "
+              + "batch size {}, start:{}", zonePath, task.zoneId,
+          task.batch.size(), task.batch.getFirstFilePath());
+      final ZoneSubmissionTracker tracker =
+          handler.getTracker(zoneNode.getId());
+      Preconditions.checkNotNull(tracker, "zone tracker not found " + zonePath);
+      tracker.numFutureDone++;
+      EncryptionFaultInjector.getInstance().reencryptUpdaterProcessOneTask();
+      processTaskEntries(zonePath, task);
+      EncryptionFaultInjector.getInstance().reencryptUpdaterProcessCheckpoint();
+      xAttrs = processCheckpoints(zoneNode, tracker);
+    } finally {
+      dir.writeUnlock();
+    }
+    FSDirEncryptionZoneOp.saveFileXAttrsForBatch(dir, task.batch.getBatch());
+    if (!xAttrs.isEmpty()) {
+      dir.getEditLog().logSetXAttrs(zonePath, xAttrs, false);
+    }
+  }
+
+  private synchronized void checkPauseForTesting() throws InterruptedException {
+    assert !dir.hasWriteLock();
+    assert !dir.getFSNamesystem().hasWriteLock();
+    if (pauseAfterNthCheckpoint != 0) {
+      ZoneSubmissionTracker tracker =
+          handler.unprotectedGetTracker(pauseZoneId);
+      if (tracker != null) {
+        if (tracker.numFutureDone == pauseAfterNthCheckpoint) {
+          shouldPauseForTesting = true;
+          pauseAfterNthCheckpoint = 0;
+        }
+      }
+    }
+    while (shouldPauseForTesting) {
+      LOG.info("Sleeping in the re-encryption updater for unit test.");
+      wait();
+      LOG.info("Continuing re-encryption updater after pausing.");
+    }
+  }
+
+  /**
+   * Throttles the ReencryptionUpdater to prevent from contending FSN/FSD write
+   * locks. This is done by the configuration.
+   */
+  private void throttle() throws InterruptedException {
+    if (throttleLimitRatio >= 1.0) {
+      return;
+    }
+
+    final long expect = (long) (throttleTimerAll.now(TimeUnit.MILLISECONDS)
+        * throttleLimitRatio);
+    final long actual = throttleTimerLocked.now(TimeUnit.MILLISECONDS);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Re-encryption updater throttling expect: {}, actual: {},"
+              + " throttleTimerAll:{}", expect, actual,
+          throttleTimerAll.now(TimeUnit.MILLISECONDS));
+    }
+    if (expect - actual < 0) {
+      // in case throttleLimitHandlerRatio is very small, expect will be 0.
+      // so sleepMs should not be calculated from expect, to really meet the
+      // ratio. e.g. if ratio is 0.001, expect = 0 and actual = 1, sleepMs
+      // should be 1000 - throttleTimerAll.now()
+      final long sleepMs =
+          (long) (actual / throttleLimitRatio) - throttleTimerAll
+              .now(TimeUnit.MILLISECONDS);
+      LOG.debug("Throttling re-encryption, sleeping for {} ms", sleepMs);
+      Thread.sleep(sleepMs);
+    }
+    throttleTimerAll.reset().start();
+    throttleTimerLocked.reset();
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org