You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2017/08/24 19:36:39 UTC

[46/50] [abbrv] hadoop git commit: HDFS-10899. Add functionality to re-encrypt EDEKs.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java
index 22039d1..2552cf5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_FILE_ENCRYPTION_INFO;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.security.GeneralSecurityException;
 import java.security.PrivilegedExceptionAction;
@@ -31,6 +32,7 @@ import java.util.Map;
 import org.apache.hadoop.crypto.CipherSuite;
 import org.apache.hadoop.crypto.CryptoProtocolVersion;
 import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
 import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
 import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
 import org.apache.hadoop.fs.FileEncryptionInfo;
@@ -42,15 +44,22 @@ import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.hdfs.XAttrHelper;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
+import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
 import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReencryptionInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ZoneEncryptionInfoProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
+import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.FileEdekInfo;
 import org.apache.hadoop.security.SecurityUtil;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.util.Time;
+
+import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_ENCRYPTION_ZONE;
 import static org.apache.hadoop.util.Time.monotonicNow;
 
 /**
@@ -216,18 +225,206 @@ final class FSDirEncryptionZoneOp {
     }
   }
 
+  static void reencryptEncryptionZone(final FSDirectory fsd,
+      final String zone, final String keyVersionName,
+      final boolean logRetryCache) throws IOException {
+    final List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
+    final FSPermissionChecker pc = fsd.getPermissionChecker();
+    fsd.writeLock();
+    try {
+      final INodesInPath iip = fsd.resolvePath(pc, zone, DirOp.WRITE);
+      final XAttr xattr = fsd.ezManager
+          .reencryptEncryptionZone(iip, keyVersionName);
+      xAttrs.add(xattr);
+    } finally {
+      fsd.writeUnlock();
+    }
+    fsd.getEditLog().logSetXAttrs(zone, xAttrs, logRetryCache);
+  }
+
+  static void cancelReencryptEncryptionZone(final FSDirectory fsd,
+      final String zone, final boolean logRetryCache) throws IOException {
+    final List<XAttr> xattrs;
+    final FSPermissionChecker pc = fsd.getPermissionChecker();
+    fsd.writeLock();
+    try {
+      final INodesInPath iip = fsd.resolvePath(pc, zone, DirOp.WRITE);
+      xattrs = fsd.ezManager.cancelReencryptEncryptionZone(iip);
+    } finally {
+      fsd.writeUnlock();
+    }
+    if (xattrs != null && !xattrs.isEmpty()) {
+      fsd.getEditLog().logSetXAttrs(zone, xattrs, logRetryCache);
+    }
+  }
+
+  static BatchedListEntries<ZoneReencryptionStatus> listReencryptionStatus(
+      final FSDirectory fsd, final long prevId)
+      throws IOException {
+    fsd.readLock();
+    try {
+      return fsd.ezManager.listReencryptionStatus(prevId);
+    } finally {
+      fsd.readUnlock();
+    }
+  }
+
+  /**
+   * Update re-encryption progress (submitted). Caller should
+   * logSync after calling this, outside of the FSN lock.
+   * <p>
+   * The reencryption status is updated during SetXAttrs.
+   */
+  static XAttr updateReencryptionSubmitted(final FSDirectory fsd,
+      final INodesInPath iip, final String ezKeyVersionName)
+      throws IOException {
+    assert fsd.hasWriteLock();
+    Preconditions.checkNotNull(ezKeyVersionName, "ezKeyVersionName is null.");
+    final ZoneEncryptionInfoProto zoneProto = getZoneEncryptionInfoProto(iip);
+    Preconditions.checkNotNull(zoneProto, "ZoneEncryptionInfoProto is null.");
+
+    final ReencryptionInfoProto newProto = PBHelperClient
+        .convert(ezKeyVersionName, Time.now(), false, 0, 0, null, null);
+    final ZoneEncryptionInfoProto newZoneProto = PBHelperClient
+        .convert(PBHelperClient.convert(zoneProto.getSuite()),
+            PBHelperClient.convert(zoneProto.getCryptoProtocolVersion()),
+            zoneProto.getKeyName(), newProto);
+
+    final XAttr xattr = XAttrHelper
+        .buildXAttr(CRYPTO_XATTR_ENCRYPTION_ZONE, newZoneProto.toByteArray());
+    final List<XAttr> xattrs = Lists.newArrayListWithCapacity(1);
+    xattrs.add(xattr);
+    FSDirXAttrOp.unprotectedSetXAttrs(fsd, iip, xattrs,
+        EnumSet.of(XAttrSetFlag.REPLACE));
+    return xattr;
+  }
+
+  /**
+   * Update re-encryption progress (start, checkpoint). Caller should
+   * logSync after calling this, outside of the FSN lock.
+   * <p>
+   * The reencryption status is updated during SetXAttrs.
+   * Original reencryption status is passed in to get existing information
+   * such as ezkeyVersionName and submissionTime.
+   */
+  static XAttr updateReencryptionProgress(final FSDirectory fsd,
+      final INode zoneNode, final ZoneReencryptionStatus origStatus,
+      final String lastFile, final long numReencrypted, final long numFailures)
+      throws IOException {
+    assert fsd.hasWriteLock();
+    Preconditions.checkNotNull(zoneNode, "Zone node is null");
+    INodesInPath iip = INodesInPath.fromINode(zoneNode);
+    final ZoneEncryptionInfoProto zoneProto = getZoneEncryptionInfoProto(iip);
+    Preconditions.checkNotNull(zoneProto, "ZoneEncryptionInfoProto is null.");
+    Preconditions.checkNotNull(origStatus, "Null status for " + iip.getPath());
+
+    final ReencryptionInfoProto newProto = PBHelperClient
+        .convert(origStatus.getEzKeyVersionName(),
+            origStatus.getSubmissionTime(), false,
+            origStatus.getFilesReencrypted() + numReencrypted,
+            origStatus.getNumReencryptionFailures() + numFailures, null,
+            lastFile);
+
+    final ZoneEncryptionInfoProto newZoneProto = PBHelperClient
+        .convert(PBHelperClient.convert(zoneProto.getSuite()),
+            PBHelperClient.convert(zoneProto.getCryptoProtocolVersion()),
+            zoneProto.getKeyName(), newProto);
+
+    final XAttr xattr = XAttrHelper
+        .buildXAttr(CRYPTO_XATTR_ENCRYPTION_ZONE, newZoneProto.toByteArray());
+    final List<XAttr> xattrs = Lists.newArrayListWithCapacity(1);
+    xattrs.add(xattr);
+    FSDirXAttrOp.unprotectedSetXAttrs(fsd, iip, xattrs,
+        EnumSet.of(XAttrSetFlag.REPLACE));
+    return xattr;
+  }
+
+  /**
+   * Log re-encrypt complete (cancel, or 100% re-encrypt) to edits.
+   * Caller should logSync after calling this, outside of the FSN lock.
+   * <p>
+   * Original reencryption status is passed in to get existing information,
+   * this should include whether it is finished due to cancellation.
+   * The reencryption status is updated during SetXAttrs for completion time.
+   */
+  static List<XAttr> updateReencryptionFinish(final FSDirectory fsd,
+      final INodesInPath zoneIIP, final ZoneReencryptionStatus origStatus)
+      throws IOException {
+    assert origStatus != null;
+    assert fsd.hasWriteLock();
+    fsd.ezManager.getReencryptionStatus()
+        .markZoneCompleted(zoneIIP.getLastINode().getId());
+    final XAttr xattr =
+        generateNewXAttrForReencryptionFinish(zoneIIP, origStatus);
+    final List<XAttr> xattrs = Lists.newArrayListWithCapacity(1);
+    xattrs.add(xattr);
+    FSDirXAttrOp.unprotectedSetXAttrs(fsd, zoneIIP, xattrs,
+        EnumSet.of(XAttrSetFlag.REPLACE));
+    return xattrs;
+  }
+
+  static XAttr generateNewXAttrForReencryptionFinish(final INodesInPath iip,
+      final ZoneReencryptionStatus status) throws IOException {
+    final ZoneEncryptionInfoProto zoneProto = getZoneEncryptionInfoProto(iip);
+    final ReencryptionInfoProto newRiProto = PBHelperClient
+        .convert(status.getEzKeyVersionName(), status.getSubmissionTime(),
+            status.isCanceled(), status.getFilesReencrypted(),
+            status.getNumReencryptionFailures(), Time.now(), null);
+
+    final ZoneEncryptionInfoProto newZoneProto = PBHelperClient
+        .convert(PBHelperClient.convert(zoneProto.getSuite()),
+            PBHelperClient.convert(zoneProto.getCryptoProtocolVersion()),
+            zoneProto.getKeyName(), newRiProto);
+
+    final XAttr xattr = XAttrHelper
+        .buildXAttr(CRYPTO_XATTR_ENCRYPTION_ZONE, newZoneProto.toByteArray());
+    return xattr;
+  }
+
+  private static ZoneEncryptionInfoProto getZoneEncryptionInfoProto(
+      final INodesInPath iip) throws IOException {
+    final XAttr fileXAttr = FSDirXAttrOp
+        .unprotectedGetXAttrByPrefixedName(iip, CRYPTO_XATTR_ENCRYPTION_ZONE);
+    if (fileXAttr == null) {
+      throw new IOException(
+          "Could not find reencryption XAttr for file " + iip.getPath());
+    }
+    try {
+      return ZoneEncryptionInfoProto.parseFrom(fileXAttr.getValue());
+    } catch (InvalidProtocolBufferException e) {
+      throw new IOException(
+          "Could not parse file encryption info for " + "inode " + iip
+              .getPath(), e);
+    }
+  }
+
+  /**
+   * Save the batch's edeks to file xattrs.
+   */
+  static void saveFileXAttrsForBatch(FSDirectory fsd,
+      List<FileEdekInfo> batch) {
+    assert fsd.getFSNamesystem().hasWriteLock();
+    if (batch != null && !batch.isEmpty()) {
+      for (FileEdekInfo entry : batch) {
+        final INode inode = fsd.getInode(entry.getInodeId());
+        Preconditions.checkNotNull(inode);
+        fsd.getEditLog().logSetXAttrs(inode.getFullPathName(),
+            inode.getXAttrFeature().getXAttrs(), false);
+      }
+    }
+  }
+
   /**
    * Set the FileEncryptionInfo for an INode.
    *
    * @param fsd fsdirectory
-   * @param src the path of a directory which will be the root of the
-   *            encryption zone.
    * @param info file encryption information
+   * @param flag action when setting xattr. Either CREATE or REPLACE.
    * @throws IOException
    */
   static void setFileEncryptionInfo(final FSDirectory fsd,
-      final INodesInPath iip, final FileEncryptionInfo info)
-          throws IOException {
+      final INodesInPath iip, final FileEncryptionInfo info,
+      final XAttrSetFlag flag) throws IOException {
     // Make the PB for the xattr
     final HdfsProtos.PerFileEncryptionInfoProto proto =
         PBHelperClient.convertPerFileEncInfo(info);
@@ -238,8 +435,7 @@ final class FSDirEncryptionZoneOp {
     xAttrs.add(fileEncryptionAttr);
     fsd.writeLock();
     try {
-      FSDirXAttrOp.unprotectedSetXAttrs(fsd, iip, xAttrs,
-                                        EnumSet.of(XAttrSetFlag.CREATE));
+      FSDirXAttrOp.unprotectedSetXAttrs(fsd, iip, xAttrs, EnumSet.of(flag));
     } finally {
       fsd.writeUnlock();
     }
@@ -500,4 +696,34 @@ final class FSDirEncryptionZoneOp {
       this.edek = edek;
     }
   }
+
+  /**
+   * Get the last key version name for the given EZ. This will contact
+   * the KMS to getKeyVersions.
+   * @param zone the encryption zone
+   * @param pc the permission checker
+   * @return the last element from the list of keyVersionNames returned by KMS.
+   * @throws IOException
+   */
+  static KeyVersion getLatestKeyVersion(final FSDirectory dir,
+      final String zone, final FSPermissionChecker pc) throws IOException {
+    final EncryptionZone ez;
+    assert dir.getProvider() != null;
+    dir.readLock();
+    try {
+      final INodesInPath iip = dir.resolvePath(pc, zone, DirOp.READ);
+      if (iip.getLastINode() == null) {
+        throw new FileNotFoundException(zone + " does not exist.");
+      }
+      dir.ezManager.checkEncryptionZoneRoot(iip.getLastINode(), iip.getPath());
+      ez = FSDirEncryptionZoneOp.getEZForPath(dir, iip);
+    } finally {
+      dir.readUnlock();
+    }
+    // Contact KMS out of locks.
+    KeyVersion currKv = dir.getProvider().getCurrentKey(ez.getKeyName());
+    Preconditions.checkNotNull(currKv,
+        "No current key versions for key name " + ez.getKeyName());
+    return currKv;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
index 7ab05d7..012e916 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 import com.google.common.base.Preconditions;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.fs.XAttrSetFlag;
 import org.apache.hadoop.hdfs.AddBlockFlag;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
@@ -397,7 +398,8 @@ class FSDirWriteFileOp {
         newNode.getFileUnderConstructionFeature().getClientName(),
         newNode.getId());
     if (feInfo != null) {
-      FSDirEncryptionZoneOp.setFileEncryptionInfo(fsd, iip, feInfo);
+      FSDirEncryptionZoneOp.setFileEncryptionInfo(fsd, iip, feInfo,
+          XAttrSetFlag.CREATE);
     }
     setNewINodeStoragePolicy(fsd.getBlockManager(), iip, isLazyPersist);
     fsd.getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
index ddc088c..acdade7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.XAttrHelper;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReencryptionInfoProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.security.AccessControlException;
@@ -275,6 +276,12 @@ class FSDirXAttrOp {
             PBHelperClient.convert(ezProto.getSuite()),
             PBHelperClient.convert(ezProto.getCryptoProtocolVersion()),
             ezProto.getKeyName());
+
+        if (ezProto.hasReencryptionProto()) {
+          ReencryptionInfoProto reProto = ezProto.getReencryptionProto();
+          fsd.ezManager.getReencryptionStatus()
+              .updateZoneStatus(inode.getId(), iip.getPath(), reProto);
+        }
       }
 
       if (!isFile && SECURITY_XATTR_UNREADABLE_BY_SUPERUSER.equals(xaName)) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 87b1156..e6aa533 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReencryptionInfoProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
@@ -1358,12 +1359,18 @@ public class FSDirectory implements Closeable {
     }
     try {
       final HdfsProtos.ZoneEncryptionInfoProto ezProto =
-          HdfsProtos.ZoneEncryptionInfoProto.parseFrom(
-              xattr.getValue());
+          HdfsProtos.ZoneEncryptionInfoProto.parseFrom(xattr.getValue());
       ezManager.unprotectedAddEncryptionZone(inode.getId(),
           PBHelperClient.convert(ezProto.getSuite()),
           PBHelperClient.convert(ezProto.getCryptoProtocolVersion()),
           ezProto.getKeyName());
+      if (ezProto.hasReencryptionProto()) {
+        final ReencryptionInfoProto reProto = ezProto.getReencryptionProto();
+        // inodes parents may not be loaded if this is done during fsimage
+        // loading so cannot set full path now. Pass in null to indicate that.
+        ezManager.getReencryptionStatus()
+            .updateZoneStatus(inode.getId(), null, reProto);
+      }
     } catch (InvalidProtocolBufferException e) {
       NameNode.LOG.warn("Error parsing protocol buffer of " +
           "EZ XAttr " + xattr.getName() + " dir:" + inode.getFullPathName());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 2313335..12d96d8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -89,9 +89,11 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
 import static org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.*;
 
+import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
 import org.apache.hadoop.hdfs.protocol.BlocksStats;
 import org.apache.hadoop.hdfs.protocol.ECBlockGroupsStats;
 import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
+import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
 import org.apache.hadoop.hdfs.server.namenode.metrics.ReplicatedBlocksMBean;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
 import static org.apache.hadoop.util.Time.now;
@@ -199,6 +201,7 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.IllegalECPolicyException;
@@ -1230,6 +1233,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       dir.updateCountForQuota();
       // Enable quota checks.
       dir.enableQuotaChecks();
+      dir.ezManager.startReencryptThreads();
+
       if (haEnabled) {
         // Renew all of the leases before becoming active.
         // This is because, while we were in standby mode,
@@ -1321,6 +1326,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         // so that the tailer starts from the right spot.
         getFSImage().updateLastAppliedTxIdFromWritten();
       }
+      if (dir != null) {
+        dir.ezManager.stopReencryptThread();
+      }
       if (cacheManager != null) {
         cacheManager.stopMonitorThread();
         cacheManager.clearDirectiveStats();
@@ -7031,6 +7039,84 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     }
   }
 
+  void reencryptEncryptionZone(final String zone, final ReencryptAction action,
+      final boolean logRetryCache) throws IOException {
+    boolean success = false;
+    try {
+      Preconditions.checkNotNull(zone, "zone is null.");
+      checkSuperuserPrivilege();
+      checkOperation(OperationCategory.WRITE);
+      checkNameNodeSafeMode("NameNode in safemode, cannot " + action
+          + " re-encryption on zone " + zone);
+      reencryptEncryptionZoneInt(zone, action, logRetryCache);
+      success = true;
+    } finally {
+      logAuditEvent(success, action + "reencryption", zone, null, null);
+    }
+  }
+
+  BatchedListEntries<ZoneReencryptionStatus> listReencryptionStatus(
+      final long prevId) throws IOException {
+    final String operationName = "listReencryptionStatus";
+    boolean success = false;
+    checkSuperuserPrivilege();
+    checkOperation(OperationCategory.READ);
+    readLock();
+    try {
+      checkSuperuserPrivilege();
+      checkOperation(OperationCategory.READ);
+      final BatchedListEntries<ZoneReencryptionStatus> ret =
+          FSDirEncryptionZoneOp.listReencryptionStatus(dir, prevId);
+      success = true;
+      return ret;
+    } finally {
+      readUnlock(operationName);
+      logAuditEvent(success, operationName, null);
+    }
+  }
+
+  private void reencryptEncryptionZoneInt(final String zone,
+      final ReencryptAction action, final boolean logRetryCache)
+      throws IOException {
+    if (getProvider() == null) {
+      throw new IOException("No key provider configured, re-encryption "
+          + "operation is rejected");
+    }
+    FSPermissionChecker pc = getPermissionChecker();
+    // get keyVersionName out of the lock. This keyVersionName will be used
+    // as the target keyVersion for the entire re-encryption.
+    // This means all edek's keyVersion will be compared with this one, and
+    // kms is only contacted if the edek's keyVersion is different.
+    final KeyVersion kv =
+        FSDirEncryptionZoneOp.getLatestKeyVersion(dir, zone, pc);
+    provider.invalidateCache(kv.getName());
+    writeLock();
+    try {
+      checkSuperuserPrivilege();
+      checkOperation(OperationCategory.WRITE);
+      checkNameNodeSafeMode(
+          "NameNode in safemode, cannot " + action + " re-encryption on zone "
+              + zone);
+      switch (action) {
+      case START:
+        FSDirEncryptionZoneOp
+            .reencryptEncryptionZone(dir, zone, kv.getVersionName(),
+                logRetryCache);
+        break;
+      case CANCEL:
+        FSDirEncryptionZoneOp
+            .cancelReencryptEncryptionZone(dir, zone, logRetryCache);
+        break;
+      default:
+        throw new IOException(
+            "Re-encryption action " + action + " is not supported");
+      }
+    } finally {
+      writeUnlock();
+    }
+    getEditLog().logSync();
+  }
+
   /**
    * Set an erasure coding policy on the given path.
    * @param srcArg  The path of the target directory.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 7871202..3fbb7bd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -105,6 +105,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSLimitException;
 import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -116,6 +117,7 @@ import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.protocol.BlocksStats;
+import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
@@ -2052,6 +2054,31 @@ public class NameNodeRpcServer implements NamenodeProtocols {
   }
 
   @Override // ClientProtocol
+  public void reencryptEncryptionZone(final String zone,
+      final ReencryptAction action) throws IOException {
+    checkNNStartup();
+    namesystem.checkOperation(OperationCategory.WRITE);
+    final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return;
+    }
+    boolean success = false;
+    try {
+      namesystem.reencryptEncryptionZone(zone, action, cacheEntry != null);
+      success = true;
+    } finally {
+      RetryCache.setState(cacheEntry, success);
+    }
+  }
+
+  @Override // ClientProtocol
+  public BatchedEntries<ZoneReencryptionStatus> listReencryptionStatus(
+      final long prevId) throws IOException {
+    checkNNStartup();
+    return namesystem.listReencryptionStatus(prevId);
+  }
+
+  @Override // ClientProtocol
   public void setErasureCodingPolicy(String src, String ecPolicyName)
       throws IOException {
     checkNNStartup();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java
new file mode 100644
index 0000000..729b894
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java
@@ -0,0 +1,940 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.fs.XAttr;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.ReencryptionStatus;
+import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
+import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus.State;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.FileEdekInfo;
+import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.ReencryptionTask;
+import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.ZoneSubmissionTracker;
+import org.apache.hadoop.hdfs.util.ReadOnlyList;
+import org.apache.hadoop.ipc.RetriableException;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.StopWatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_BATCH_SIZE_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_BATCH_SIZE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_EDEK_THREADS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_EDEK_THREADS_KEY;
+
+/**
+ * Class for handling re-encrypt EDEK operations.
+ * <p>
+ * For each EZ, ReencryptionHandler walks the tree in a depth-first order,
+ * and submits batches of (files + existing edeks) as re-encryption tasks
+ * to a thread pool. Each thread in the pool then contacts the KMS to
+ * re-encrypt the edeks. ReencryptionUpdater tracks the tasks and updates
+ * file xattrs with the new edeks.
+ * <p>
+ * File renames are disabled in the EZ that's being re-encrypted. Newly created
+ * files will have new edeks, because the edek cache is drained upon the
+ * submission of a re-encryption command.
+ * <p>
+ * It is assumed only 1 ReencryptionHandler will be running, because:
+ *   1. The bottleneck of the entire re-encryption appears to be on the KMS.
+ *   2. Even with multiple handlers, since updater requires writelock and is
+ * single-threaded, the performance gain is limited.
+ * <p>
+ * This class uses the FSDirectory lock for synchronization.
+ */
+@InterfaceAudience.Private
+public class ReencryptionHandler implements Runnable {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(ReencryptionHandler.class);
+
+  // 2000 is based on buffer size = 512 * 1024, and SetXAttr op size is
+  // 100 - 200 bytes (depending on the xattr value).
+  // The buffer size is hard-coded, see outputBufferCapacity from QJM.
+  private static final int MAX_BATCH_SIZE_WITHOUT_FLOODING = 2000;
+
+  private final EncryptionZoneManager ezManager;
+  private final FSDirectory dir;
+  private final long interval;
+  private final int reencryptBatchSize;
+  private double throttleLimitHandlerRatio;
+  private final int reencryptThreadPoolSize;
+  // stopwatches for throttling
+  private final StopWatch throttleTimerAll = new StopWatch();
+  private final StopWatch throttleTimerLocked = new StopWatch();
+
+  private ExecutorCompletionService<ReencryptionTask> batchService;
+  private BlockingQueue<Runnable> taskQueue;
+  // protected by ReencryptionHandler object lock
+  private final Map<Long, ZoneSubmissionTracker> submissions =
+      new ConcurrentHashMap<>();
+
+  // The current batch that the handler is working on. Handler is designed to
+  // be single-threaded, see class javadoc for more details.
+  private ReencryptionBatch currentBatch;
+
+  private final ReencryptionUpdater reencryptionUpdater;
+  private ExecutorService updaterExecutor;
+
+  // Vars for unit tests.
+  private volatile boolean shouldPauseForTesting = false;
+  private volatile int pauseAfterNthSubmission = 0;
+
+  /**
+   * Stop the re-encryption updater thread, as well as all EDEK re-encryption
+   * tasks submitted.
+   */
+  void stopThreads() {
+    assert dir.hasWriteLock();
+    for (ZoneSubmissionTracker zst : submissions.values()) {
+      zst.cancelAllTasks();
+    }
+    if (updaterExecutor != null) {
+      updaterExecutor.shutdownNow();
+    }
+  }
+
+  /**
+   * Start the re-encryption updater thread.
+   */
+  void startUpdaterThread() {
+    updaterExecutor = Executors.newSingleThreadExecutor(
+        new ThreadFactoryBuilder().setDaemon(true)
+            .setNameFormat("reencryptionUpdaterThread #%d").build());
+    updaterExecutor.execute(reencryptionUpdater);
+  }
+
+  @VisibleForTesting
+  synchronized void pauseForTesting() {
+    shouldPauseForTesting = true;
+    LOG.info("Pausing re-encrypt handler for testing.");
+    notify();
+  }
+
+  @VisibleForTesting
+  synchronized void resumeForTesting() {
+    shouldPauseForTesting = false;
+    LOG.info("Resuming re-encrypt handler for testing.");
+    notify();
+  }
+
+  @VisibleForTesting
+  void pauseForTestingAfterNthSubmission(final int count) {
+    assert pauseAfterNthSubmission == 0;
+    pauseAfterNthSubmission = count;
+  }
+
+  @VisibleForTesting
+  void pauseUpdaterForTesting() {
+    reencryptionUpdater.pauseForTesting();
+  }
+
+  @VisibleForTesting
+  void resumeUpdaterForTesting() {
+    reencryptionUpdater.resumeForTesting();
+  }
+
+  @VisibleForTesting
+  void pauseForTestingAfterNthCheckpoint(final long zoneId, final int count) {
+    reencryptionUpdater.pauseForTestingAfterNthCheckpoint(zoneId, count);
+  }
+
+  private synchronized void checkPauseForTesting() throws InterruptedException {
+    assert !dir.hasReadLock();
+    assert !dir.getFSNamesystem().hasReadLock();
+    while (shouldPauseForTesting) {
+      LOG.info("Sleeping in the re-encrypt handler for unit test.");
+      wait();
+      LOG.info("Continuing re-encrypt handler after pausing.");
+    }
+  }
+
+  ReencryptionHandler(final EncryptionZoneManager ezMgr,
+      final Configuration conf) {
+    this.ezManager = ezMgr;
+    Preconditions.checkNotNull(ezManager.getProvider(),
+        "No provider set, cannot re-encrypt");
+    this.dir = ezMgr.getFSDirectory();
+    this.interval =
+        conf.getTimeDuration(DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_KEY,
+            DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_DEFAULT,
+            TimeUnit.MILLISECONDS);
+    Preconditions.checkArgument(interval > 0,
+        DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_KEY + " is not positive.");
+    this.reencryptBatchSize = conf.getInt(DFS_NAMENODE_REENCRYPT_BATCH_SIZE_KEY,
+        DFS_NAMENODE_REENCRYPT_BATCH_SIZE_DEFAULT);
+    Preconditions.checkArgument(reencryptBatchSize > 0,
+        DFS_NAMENODE_REENCRYPT_BATCH_SIZE_KEY + " is not positive.");
+    if (reencryptBatchSize > MAX_BATCH_SIZE_WITHOUT_FLOODING) {
+      LOG.warn("Re-encryption batch size is {}. It could cause edit log buffer "
+          + "to be full and trigger a logSync within the writelock, greatly "
+          + "impacting namenode throughput.", reencryptBatchSize);
+    }
+    this.throttleLimitHandlerRatio =
+        conf.getDouble(DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_KEY,
+            DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_DEFAULT);
+    LOG.info("Configured throttleLimitHandlerRatio={} for re-encryption",
+        throttleLimitHandlerRatio);
+    Preconditions.checkArgument(throttleLimitHandlerRatio > 0.0f,
+        DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_KEY
+            + " is not positive.");
+    this.reencryptThreadPoolSize =
+        conf.getInt(DFS_NAMENODE_REENCRYPT_EDEK_THREADS_KEY,
+            DFS_NAMENODE_REENCRYPT_EDEK_THREADS_DEFAULT);
+
+    taskQueue = new LinkedBlockingQueue<>();
+    ThreadPoolExecutor threadPool =
+        new ThreadPoolExecutor(reencryptThreadPoolSize, reencryptThreadPoolSize,
+            60, TimeUnit.SECONDS, taskQueue, new Daemon.DaemonFactory() {
+              private final AtomicInteger ind = new AtomicInteger(0);
+
+              @Override
+              public Thread newThread(Runnable r) {
+                Thread t = super.newThread(r);
+                t.setName("reencryption edek Thread-" + ind.getAndIncrement());
+                return t;
+              }
+            }, new ThreadPoolExecutor.CallerRunsPolicy() {
+
+              @Override
+              public void rejectedExecution(Runnable runnable,
+                  ThreadPoolExecutor e) {
+                LOG.info("Execution rejected, executing in current thread");
+                super.rejectedExecution(runnable, e);
+              }
+            });
+
+    threadPool.allowCoreThreadTimeOut(true);
+    this.batchService = new ExecutorCompletionService(threadPool);
+    reencryptionUpdater =
+        new ReencryptionUpdater(dir, batchService, this, conf);
+    currentBatch = new ReencryptionBatch(reencryptBatchSize);
+  }
+
+  ReencryptionStatus getReencryptionStatus() {
+    return ezManager.getReencryptionStatus();
+  }
+
+  void cancelZone(final long zoneId, final String zoneName) throws IOException {
+    assert dir.hasWriteLock();
+    final ZoneReencryptionStatus zs =
+        getReencryptionStatus().getZoneStatus(zoneId);
+    if (zs == null || zs.getState() == State.Completed) {
+      throw new IOException("Zone " + zoneName + " is not under re-encryption");
+    }
+    zs.cancel();
+    ZoneSubmissionTracker zst = submissions.get(zoneId);
+    if (zst != null) {
+      zst.cancelAllTasks();
+    }
+  }
+
+  void removeZone(final long zoneId) {
+    assert dir.hasWriteLock();
+    LOG.info("Removing zone {} from re-encryption.", zoneId);
+    ZoneSubmissionTracker zst = submissions.get(zoneId);
+    if (zst != null) {
+      zst.cancelAllTasks();
+    }
+    submissions.remove(zoneId);
+    getReencryptionStatus().removeZone(zoneId);
+  }
+
+  ZoneSubmissionTracker getTracker(final long zoneId) {
+    dir.hasReadLock();
+    return unprotectedGetTracker(zoneId);
+  }
+
+  /**
+   * get the tracker without holding the FSDirectory lock. This is only used for
+   * testing, when updater checks about pausing.
+   */
+  ZoneSubmissionTracker unprotectedGetTracker(final long zoneId) {
+    return submissions.get(zoneId);
+  }
+
+  /**
+   * Add a dummy tracker (with 1 task that has 0 files to re-encrypt)
+   * for the zone. This is necessary to complete the re-encryption in case
+   * no file in the entire zone needs re-encryption at all. We cannot simply
+   * update zone status and set zone xattrs, because in the handler we only hold
+   * readlock, and setting xattrs requires upgrading to a writelock.
+   *
+   * @param zoneId
+   */
+  void addDummyTracker(final long zoneId) {
+    assert dir.hasReadLock();
+    assert !submissions.containsKey(zoneId);
+    final ZoneSubmissionTracker zst = new ZoneSubmissionTracker();
+    zst.setSubmissionDone();
+
+    Future future = batchService.submit(
+        new EDEKReencryptCallable(zoneId, new ReencryptionBatch(), this));
+    zst.addTask(future);
+    submissions.put(zoneId, zst);
+  }
+
+  /**
+   * Main loop. It takes at most 1 zone per scan, and executes until the zone
+   * is completed.
+   * {@see #reencryptEncryptionZoneInt(Long)}.
+   */
+  @Override
+  public void run() {
+    LOG.info("Starting up re-encrypt thread with interval={} millisecond.",
+        interval);
+    while (true) {
+      try {
+        synchronized (this) {
+          wait(interval);
+        }
+        checkPauseForTesting();
+      } catch (InterruptedException ie) {
+        LOG.info("Re-encrypt handler interrupted. Exiting");
+        Thread.currentThread().interrupt();
+        return;
+      }
+
+      final Long zoneId;
+      dir.readLock();
+      try {
+        zoneId = getReencryptionStatus().getNextUnprocessedZone();
+        if (zoneId == null) {
+          // empty queue.
+          continue;
+        }
+        LOG.info("Executing re-encrypt commands on zone {}. Current zones:{}",
+            zoneId, getReencryptionStatus());
+      } finally {
+        dir.readUnlock();
+      }
+
+      try {
+        reencryptEncryptionZone(zoneId);
+      } catch (RetriableException | SafeModeException re) {
+        LOG.info("Re-encryption caught exception, will retry", re);
+        getReencryptionStatus().markZoneForRetry(zoneId);
+      } catch (IOException ioe) {
+        LOG.warn("IOException caught when re-encrypting zone {}", zoneId, ioe);
+      } catch (InterruptedException ie) {
+        LOG.info("Re-encrypt handler interrupted. Exiting.");
+        Thread.currentThread().interrupt();
+        return;
+      } catch (Throwable t) {
+        LOG.error("Re-encrypt handler thread exiting. Exception caught when"
+            + " re-encrypting zone {}.", zoneId, t);
+        return;
+      }
+    }
+  }
+
+  /**
+   * Re-encrypts a zone by recursively iterating all paths inside the zone,
+   * in lexicographic order.
+   * Files are re-encrypted, and subdirs are processed during iteration.
+   *
+   * @param zoneId the Zone's id.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  void reencryptEncryptionZone(final long zoneId)
+      throws IOException, InterruptedException {
+    throttleTimerAll.reset().start();
+    throttleTimerLocked.reset();
+    final INode zoneNode;
+    final ZoneReencryptionStatus zs;
+
+    readLock();
+    try {
+      getReencryptionStatus().markZoneStarted(zoneId);
+      zoneNode = dir.getInode(zoneId);
+      // start re-encrypting the zone from the beginning
+      if (zoneNode == null) {
+        LOG.info("Directory with id {} removed during re-encrypt, skipping",
+            zoneId);
+        return;
+      }
+      if (!zoneNode.isDirectory()) {
+        LOG.info("Cannot re-encrypt directory with id {} because it's not a"
+            + " directory.", zoneId);
+        return;
+      }
+
+      zs = getReencryptionStatus().getZoneStatus(zoneId);
+      assert zs != null;
+      // Only costly log FullPathName here once, and use id elsewhere.
+      LOG.info("Re-encrypting zone {}(id={})", zoneNode.getFullPathName(),
+          zoneId);
+      if (zs.getLastCheckpointFile() == null) {
+        // new re-encryption
+        reencryptDir(zoneNode.asDirectory(), zoneId, HdfsFileStatus.EMPTY_NAME,
+            zs.getEzKeyVersionName());
+      } else {
+        // resuming from a past re-encryption
+        restoreFromLastProcessedFile(zoneId, zs);
+      }
+      // save the last batch and mark complete
+      submitCurrentBatch(zoneId);
+      LOG.info("Submission completed of zone {} for re-encryption.", zoneId);
+      reencryptionUpdater.markZoneSubmissionDone(zoneId);
+    } finally {
+      readUnlock();
+    }
+  }
+
+  List<XAttr> completeReencryption(final INode zoneNode) throws IOException {
+    assert dir.hasWriteLock();
+    assert dir.getFSNamesystem().hasWriteLock();
+    final Long zoneId = zoneNode.getId();
+    ZoneReencryptionStatus zs = getReencryptionStatus().getZoneStatus(zoneId);
+    assert zs != null;
+    LOG.info("Re-encryption completed on zone {}. Re-encrypted {} files,"
+            + " failures encountered: {}.", zoneNode.getFullPathName(),
+        zs.getFilesReencrypted(), zs.getNumReencryptionFailures());
+    // This also removes the zone from reencryptionStatus
+    submissions.remove(zoneId);
+    return FSDirEncryptionZoneOp
+        .updateReencryptionFinish(dir, INodesInPath.fromINode(zoneNode), zs);
+  }
+
+  /**
+   * Restore the re-encryption from the progress inside ReencryptionStatus.
+   * This means start from exactly the lastProcessedFile (LPF), skipping all
+   * earlier paths in lexicographic order. Lexicographically-later directories
+   * on the LPF parent paths are added to subdirs.
+   */
+  private void restoreFromLastProcessedFile(final long zoneId,
+      final ZoneReencryptionStatus zs)
+      throws IOException, InterruptedException {
+    final INodeDirectory parent;
+    final byte[] startAfter;
+    final INodesInPath lpfIIP =
+        dir.getINodesInPath(zs.getLastCheckpointFile(), FSDirectory.DirOp.READ);
+    parent = lpfIIP.getLastINode().getParent();
+    startAfter = lpfIIP.getLastINode().getLocalNameBytes();
+    reencryptDir(parent, zoneId, startAfter, zs.getEzKeyVersionName());
+  }
+
+  /**
+   * Iterate through all files directly inside parent, and recurse down
+   * directories. The listing is done in batch, and can optionally start after
+   * a position.
+   * <p>
+   * Each batch is then send to the threadpool, where KMS will be contacted and
+   * edek re-encrypted. {@link ReencryptionUpdater} handles the tasks completed
+   * from the threadpool.
+   * <p>
+   * The iteration of the inode tree is done in a depth-first fashion. But
+   * instead of holding all INodeDirectory's in memory on the fly, only the
+   * path components to the current inode is held. This is to reduce memory
+   * consumption.
+   *
+   * @param parent     The inode id of parent directory
+   * @param zoneId     Id of the EZ inode
+   * @param startAfter Full path of a file the re-encrypt should start after.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private void reencryptDir(final INodeDirectory parent, final long zoneId,
+      byte[] startAfter, final String ezKeyVerName)
+      throws IOException, InterruptedException {
+    List<byte[]> startAfters = new ArrayList<>();
+    if (parent == null) {
+      return;
+    }
+    INode curr = parent;
+    // construct startAfters all the way up to the zone inode.
+    startAfters.add(startAfter);
+    while (curr.getId() != zoneId) {
+      startAfters.add(0, curr.getLocalNameBytes());
+      curr = curr.getParent();
+    }
+    curr = reencryptDirInt(zoneId, parent, startAfters, ezKeyVerName);
+    while (!startAfters.isEmpty()) {
+      if (curr == null) {
+        // lock was reacquired, re-resolve path.
+        curr = resolvePaths(zoneId, startAfters);
+      }
+      curr = reencryptDirInt(zoneId, curr, startAfters, ezKeyVerName);
+    }
+  }
+
+  /**
+   * Resolve the cursor of re-encryption to an inode.
+   * <p>
+   * The parent of the lowest level startAfter is returned. If somewhere in the
+   * middle of startAfters changed, the parent of the lowest unchanged level is
+   * returned.
+   *
+   * @param zoneId      Id of the EZ inode.
+   * @param startAfters the cursor, represented by a list of path bytes.
+   * @return the parent inode corresponding to the startAfters, or null if
+   * the EZ node (furthest parent) is deleted.
+   */
+  private INode resolvePaths(final long zoneId, List<byte[]> startAfters)
+      throws IOException {
+    // If the readlock was reacquired, we need to resolve the paths again
+    // in case things have changed. If our cursor file/dir is changed,
+    // continue from the next one.
+    INode zoneNode = dir.getInode(zoneId);
+    if (zoneNode == null) {
+      throw new FileNotFoundException("Zone " + zoneId + " is deleted.");
+    }
+    INodeDirectory parent = zoneNode.asDirectory();
+    for (int i = 0; i < startAfters.size(); ++i) {
+      if (i == startAfters.size() - 1) {
+        // last startAfter does not need to be resolved, since search for
+        // nextChild will cover that automatically.
+        break;
+      }
+      INode curr =
+          parent.getChild(startAfters.get(i), Snapshot.CURRENT_STATE_ID);
+      if (curr == null) {
+        // inode at this level has changed. Update startAfters to point to
+        // the next dir at the parent level (and dropping any startAfters
+        // at lower levels).
+        for (; i < startAfters.size(); ++i) {
+          startAfters.remove(startAfters.size() - 1);
+        }
+        break;
+      }
+      parent = curr.asDirectory();
+    }
+    return parent;
+  }
+
+  /**
+   * Submit the current batch to the thread pool.
+   *
+   * @param zoneId Id of the EZ INode
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private void submitCurrentBatch(final long zoneId)
+      throws IOException, InterruptedException {
+    assert dir.hasReadLock();
+    if (currentBatch.isEmpty()) {
+      return;
+    }
+    ZoneSubmissionTracker zst = submissions.get(zoneId);
+    if (zst == null) {
+      zst = new ZoneSubmissionTracker();
+      submissions.put(zoneId, zst);
+    }
+    Future future = batchService
+        .submit(new EDEKReencryptCallable(zoneId, currentBatch, this));
+    zst.addTask(future);
+    LOG.info("Submitted batch (start:{}, size:{}) of zone {} to re-encrypt.",
+        currentBatch.getFirstFilePath(), currentBatch.size(), zoneId);
+    currentBatch = new ReencryptionBatch(reencryptBatchSize);
+    // flip the pause flag if this is nth submission.
+    // The actual pause need to happen outside of the lock.
+    if (pauseAfterNthSubmission > 0) {
+      if (--pauseAfterNthSubmission == 0) {
+        shouldPauseForTesting = true;
+      }
+    }
+  }
+
+  final class ReencryptionBatch {
+    // First file's path, for logging purpose.
+    private String firstFilePath;
+    private final List<FileEdekInfo> batch;
+
+    ReencryptionBatch() {
+      this(reencryptBatchSize);
+    }
+
+    ReencryptionBatch(int initialCapacity) {
+      batch = new ArrayList<>(initialCapacity);
+    }
+
+    void add(final INodeFile inode) throws IOException {
+      assert dir.hasReadLock();
+      Preconditions.checkNotNull(inode, "INodeFile is null");
+      if (batch.isEmpty()) {
+        firstFilePath = inode.getFullPathName();
+      }
+      batch.add(new FileEdekInfo(dir, inode));
+    }
+
+    String getFirstFilePath() {
+      return firstFilePath;
+    }
+
+    boolean isEmpty() {
+      return batch.isEmpty();
+    }
+
+    int size() {
+      return batch.size();
+    }
+
+    void clear() {
+      batch.clear();
+    }
+
+    List<FileEdekInfo> getBatch() {
+      return batch;
+    }
+  }
+
+  /**
+   * Simply contacts the KMS for re-encryption. No NN locks held.
+   */
+  private static class EDEKReencryptCallable
+      implements Callable<ReencryptionTask> {
+    private final long zoneNodeId;
+    private final ReencryptionBatch batch;
+    private final ReencryptionHandler handler;
+
+    EDEKReencryptCallable(final long zoneId,
+        final ReencryptionBatch currentBatch, final ReencryptionHandler rh) {
+      zoneNodeId = zoneId;
+      batch = currentBatch;
+      handler = rh;
+    }
+
+    @Override
+    public ReencryptionTask call() {
+      LOG.info("Processing batched re-encryption for zone {}, batch size {},"
+          + " start:{}", zoneNodeId, batch.size(), batch.getFirstFilePath());
+      if (batch.isEmpty()) {
+        return new ReencryptionTask(zoneNodeId, 0, batch);
+      }
+      final Stopwatch kmsSW = new Stopwatch().start();
+
+      int numFailures = 0;
+      String result = "Completed";
+      if (!reencryptEdeks()) {
+        numFailures += batch.size();
+        result = "Failed to";
+      }
+      LOG.info("{} re-encrypting one batch of {} edeks from KMS,"
+              + " time consumed: {}, start: {}.", result,
+          batch.size(), kmsSW.stop(), batch.getFirstFilePath());
+      return new ReencryptionTask(zoneNodeId, numFailures, batch);
+    }
+
+    private boolean reencryptEdeks() {
+      // communicate with the kms out of lock
+      final List<EncryptedKeyVersion> edeks = new ArrayList<>(batch.size());
+      for (FileEdekInfo entry : batch.getBatch()) {
+        edeks.add(entry.getExistingEdek());
+      }
+      // provider already has LoadBalancingKMSClientProvider's reties. It that
+      // fails, just fail this callable.
+      try {
+        handler.ezManager.getProvider().reencryptEncryptedKeys(edeks);
+        EncryptionFaultInjector.getInstance().reencryptEncryptedKeys();
+      } catch (GeneralSecurityException | IOException ex) {
+        LOG.warn("Failed to re-encrypt one batch of {} edeks, start:{}",
+            batch.size(), batch.getFirstFilePath(), ex);
+        return false;
+      }
+      int i = 0;
+      for (FileEdekInfo entry : batch.getBatch()) {
+        assert i < edeks.size();
+        entry.setEdek(edeks.get(i++));
+      }
+      return true;
+    }
+  }
+
+  /**
+   * Iterates the parent directory, and add direct children files to
+   * current batch. If batch size meets configured threshold, a Callable
+   * is created and sent to the thread pool, which will communicate to the KMS
+   * to get new edeks.
+   * <p>
+   * Locks could be released and reacquired when a Callable is created.
+   *
+   * @param zoneId Id of the EZ INode
+   * @return The inode which was just processed, if lock is held in the entire
+   * process. Null if lock is released.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private INode reencryptDirInt(final long zoneId, INode curr,
+      List<byte[]> startAfters, final String ezKeyVerName)
+      throws IOException, InterruptedException {
+    assert dir.hasReadLock();
+    assert dir.getFSNamesystem().hasReadLock();
+    Preconditions.checkNotNull(curr, "Current inode can't be null");
+    checkZoneReady(zoneId);
+    final INodeDirectory parent =
+        curr.isDirectory() ? curr.asDirectory() : curr.getParent();
+    ReadOnlyList<INode> children =
+        parent.getChildrenList(Snapshot.CURRENT_STATE_ID);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Re-encrypting directory {}", parent.getFullPathName());
+    }
+
+    final byte[] startAfter = startAfters.get(startAfters.size() - 1);
+    boolean lockReleased = false;
+    for (int i = INodeDirectory.nextChild(children, startAfter);
+         i < children.size(); ++i) {
+      final INode inode = children.get(i);
+      if (!reencryptINode(inode, ezKeyVerName)) {
+        // inode wasn't added for re-encryption. Recurse down if it's a dir,
+        // skip otherwise.
+        if (!inode.isDirectory()) {
+          continue;
+        }
+        if (ezManager.isEncryptionZoneRoot(inode, inode.getFullPathName())) {
+          // nested EZ, ignore.
+          LOG.info("{}({}) is a nested EZ, skipping for re-encryption",
+              inode.getFullPathName(), inode.getId());
+          continue;
+        }
+        // add 1 level to the depth-first search.
+        curr = inode;
+        if (!startAfters.isEmpty()) {
+          startAfters.remove(startAfters.size() - 1);
+          startAfters.add(curr.getLocalNameBytes());
+        }
+        startAfters.add(HdfsFileStatus.EMPTY_NAME);
+        return lockReleased ? null : curr;
+      }
+      if (currentBatch.size() >= reencryptBatchSize) {
+        final byte[] currentStartAfter = inode.getLocalNameBytes();
+        final String parentPath = parent.getFullPathName();
+        submitCurrentBatch(zoneId);
+        lockReleased = true;
+        readUnlock();
+        try {
+          throttle();
+          checkPauseForTesting();
+        } finally {
+          readLock();
+        }
+        checkZoneReady(zoneId);
+
+        // Things could have changed when the lock was released.
+        // Re-resolve the parent inode.
+        FSPermissionChecker pc = dir.getPermissionChecker();
+        INode newParent =
+            dir.resolvePath(pc, parentPath, FSDirectory.DirOp.READ)
+                .getLastINode();
+        if (newParent == null || !newParent.equals(parent)) {
+          // parent dir is deleted or recreated. We're done.
+          return null;
+        }
+        children = parent.getChildrenList(Snapshot.CURRENT_STATE_ID);
+        // -1 to counter the ++ on the for loop
+        i = INodeDirectory.nextChild(children, currentStartAfter) - 1;
+      }
+    }
+    // Successfully finished this dir, adjust pointers to 1 level up, and
+    // startAfter this dir.
+    startAfters.remove(startAfters.size() - 1);
+    if (!startAfters.isEmpty()) {
+      startAfters.remove(startAfters.size() - 1);
+      startAfters.add(curr.getLocalNameBytes());
+    }
+    curr = curr.getParent();
+    return lockReleased ? null : curr;
+  }
+
+  private void readLock() {
+    dir.getFSNamesystem().readLock();
+    dir.readLock();
+    throttleTimerLocked.start();
+  }
+
+  private void readUnlock() {
+    dir.readUnlock();
+    dir.getFSNamesystem().readUnlock("reencryptHandler");
+    throttleTimerLocked.stop();
+  }
+
+  /**
+   * Throttles the ReencryptionHandler in 3 aspects:
+   * 1. Prevents generating more Callables than the CPU could possibly handle.
+   * 2. Prevents generating more Callables than the ReencryptionUpdater can
+   *   handle, under its own throttling
+   * 3. Prevents contending FSN/FSD read locks. This is done based on the
+   *   DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_RATIO_KEY configuration.
+   * <p>
+   * Item 1 and 2 are to control NN heap usage.
+   *
+   * @throws InterruptedException
+   */
+  @VisibleForTesting
+  void throttle() throws InterruptedException {
+    // 1.
+    final int numCores = Runtime.getRuntime().availableProcessors();
+    if (taskQueue.size() >= numCores) {
+      LOG.debug("Re-encryption handler throttling because queue size {} is"
+          + "larger than number of cores {}", taskQueue.size(), numCores);
+      while (taskQueue.size() >= numCores) {
+        Thread.sleep(100);
+      }
+    }
+
+    // 2. if tasks are piling up on the updater, don't create new callables
+    // until the queue size goes down.
+    final int maxTasksPiled = Runtime.getRuntime().availableProcessors() * 2;
+    int totalTasks = 0;
+    for (ZoneSubmissionTracker zst : submissions.values()) {
+      totalTasks += zst.getTasks().size();
+    }
+    if (totalTasks >= maxTasksPiled) {
+      LOG.debug("Re-encryption handler throttling because total tasks pending"
+          + " re-encryption updater is {}", totalTasks);
+      while (totalTasks >= maxTasksPiled) {
+        Thread.sleep(500);
+        totalTasks = 0;
+        for (ZoneSubmissionTracker zst : submissions.values()) {
+          totalTasks += zst.getTasks().size();
+        }
+      }
+    }
+
+    // 3.
+    if (throttleLimitHandlerRatio >= 1.0) {
+      return;
+    }
+    final long expect = (long) (throttleTimerAll.now(TimeUnit.MILLISECONDS)
+        * throttleLimitHandlerRatio);
+    final long actual = throttleTimerLocked.now(TimeUnit.MILLISECONDS);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Re-encryption handler throttling expect: {}, actual: {},"
+              + " throttleTimerAll:{}", expect, actual,
+          throttleTimerAll.now(TimeUnit.MILLISECONDS));
+    }
+    if (expect - actual < 0) {
+      // in case throttleLimitHandlerRatio is very small, expect will be 0.
+      // so sleepMs should not be calculated from expect, to really meet the
+      // ratio. e.g. if ratio is 0.001, expect = 0 and actual = 1, sleepMs
+      // should be 1000 - throttleTimerAll.now()
+      final long sleepMs =
+          (long) (actual / throttleLimitHandlerRatio) - throttleTimerAll
+              .now(TimeUnit.MILLISECONDS);
+      LOG.debug("Throttling re-encryption, sleeping for {} ms", sleepMs);
+      Thread.sleep(sleepMs);
+    }
+    throttleTimerAll.reset().start();
+    throttleTimerLocked.reset();
+  }
+
+  /**
+   * Process an Inode for re-encryption. Add to current batch if it's a file,
+   * no-op otherwise.
+   *
+   * @param inode the inode
+   * @return true if inode is added to currentBatch and should be re-encrypted.
+   * false otherwise: could be inode is not a file, or inode's edek's
+   * key version is not changed.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private boolean reencryptINode(final INode inode, final String ezKeyVerName)
+      throws IOException, InterruptedException {
+    dir.hasReadLock();
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Processing {} for re-encryption", inode.getFullPathName());
+    }
+    if (!inode.isFile()) {
+      return false;
+    }
+    FileEncryptionInfo feInfo = FSDirEncryptionZoneOp
+        .getFileEncryptionInfo(dir, INodesInPath.fromINode(inode));
+    if (feInfo == null) {
+      LOG.warn("File {} skipped re-encryption because it is not encrypted! "
+          + "This is very likely a bug.", inode.getId());
+      return false;
+    }
+    if (ezKeyVerName.equals(feInfo.getEzKeyVersionName())) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("File {} skipped re-encryption because edek's key version"
+            + " name is not changed.", inode.getFullPathName());
+      }
+      return false;
+    }
+    currentBatch.add(inode.asFile());
+    return true;
+  }
+
+  /**
+   * Check whether zone is ready for re-encryption. Throws IOE if it's not.
+   * 1. If EZ is deleted.
+   * 2. if the re-encryption is canceled.
+   * 3. If NN is not active or is in safe mode.
+   *
+   * @throws IOException if zone does not exist / is cancelled, or if NN is not
+   *                     ready for write.
+   */
+  void checkZoneReady(final long zoneId)
+      throws RetriableException, SafeModeException, IOException {
+    final ZoneReencryptionStatus zs =
+        getReencryptionStatus().getZoneStatus(zoneId);
+    if (zs == null) {
+      throw new IOException("Zone " + zoneId + " status cannot be found.");
+    }
+    if (zs.isCanceled()) {
+      throw new IOException("Re-encryption is canceled for zone " + zoneId);
+    }
+    dir.getFSNamesystem()
+        .checkNameNodeSafeMode("NN is in safe mode, cannot re-encrypt.");
+    // re-encryption should be cancelled when NN goes to standby. Just
+    // double checking for sanity.
+    dir.getFSNamesystem().checkOperation(NameNode.OperationCategory.WRITE);
+  }
+
+  /**
+   * Called when a new zone is submitted for re-encryption. This will interrupt
+   * the background thread if it's waiting for the next
+   * DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_KEY.
+   */
+  synchronized void notifyNewSubmission() {
+    LOG.debug("Notifying handler for new re-encryption command.");
+    this.notify();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java
new file mode 100644
index 0000000..690a0e9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java
@@ -0,0 +1,523 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.fs.XAttr;
+import org.apache.hadoop.fs.XAttrSetFlag;
+import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
+import org.apache.hadoop.hdfs.server.namenode.ReencryptionHandler.ReencryptionBatch;
+import org.apache.hadoop.ipc.RetriableException;
+import org.apache.hadoop.util.StopWatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_UPDATER_RATIO_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_UPDATER_RATIO_KEY;
+
+/**
+ * Class for finalizing re-encrypt EDEK operations, by updating file xattrs with
+ * edeks returned from reencryption.
+ * <p>
+ * The tasks are submitted by ReencryptionHandler.
+ * <p>
+ * It is assumed only 1 Updater will be running, since updating file xattrs
+ * requires namespace write lock, and performance gain from multi-threading
+ * is limited.
+ */
+@InterfaceAudience.Private
+public final class ReencryptionUpdater implements Runnable {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(ReencryptionUpdater.class);
+
+  private volatile boolean shouldPauseForTesting = false;
+  private volatile int pauseAfterNthCheckpoint = 0;
+  private volatile long pauseZoneId = 0;
+
+  private double throttleLimitRatio;
+  private final StopWatch throttleTimerAll = new StopWatch();
+  private final StopWatch throttleTimerLocked = new StopWatch();
+
+  private volatile long faultRetryInterval = 60000;
+
+  /**
+   * Class to track re-encryption submissions of a single zone. It contains
+   * all the submitted futures, and statistics about how far the futures are
+   * processed.
+   */
+  static final class ZoneSubmissionTracker {
+    private boolean submissionDone;
+    private LinkedList<Future> tasks;
+    private int numCheckpointed;
+    private int numFutureDone;
+
+    ZoneSubmissionTracker() {
+      submissionDone = false;
+      tasks = new LinkedList<>();
+      numCheckpointed = 0;
+      numFutureDone = 0;
+    }
+
+    LinkedList<Future> getTasks() {
+      return tasks;
+    }
+
+    void cancelAllTasks() {
+      if (!tasks.isEmpty()) {
+        LOG.info("Cancelling {} re-encryption tasks", tasks.size());
+        for (Future f : tasks) {
+          f.cancel(true);
+        }
+      }
+    }
+
+    void addTask(final Future task) {
+      tasks.add(task);
+    }
+
+    private boolean isCompleted() {
+      return submissionDone && tasks.isEmpty();
+    }
+
+    void setSubmissionDone() {
+      submissionDone = true;
+    }
+  }
+
+  /**
+   * Class representing the task for one batch of a re-encryption command. It
+   * also contains statistics about how far this single batch has been executed.
+   */
+  static final class ReencryptionTask {
+    private final long zoneId;
+    private boolean processed = false;
+    private int numFilesUpdated = 0;
+    private int numFailures = 0;
+    private String lastFile = null;
+    private final ReencryptionBatch batch;
+
+    ReencryptionTask(final long id, final int failures,
+        final ReencryptionBatch theBatch) {
+      zoneId = id;
+      numFailures = failures;
+      batch = theBatch;
+    }
+  }
+
+  /**
+   * Class that encapsulates re-encryption details of a file. It contains the
+   * file inode, stores the initial edek of the file, and the new edek
+   * after re-encryption.
+   * <p>
+   * Assumptions are the object initialization happens when dir lock is held,
+   * and inode is valid and is encrypted during initialization.
+   * <p>
+   * Namespace changes may happen during re-encryption, and if inode is changed
+   * the re-encryption is skipped.
+   */
+  static final class FileEdekInfo {
+    private final long inodeId;
+    private final EncryptedKeyVersion existingEdek;
+    private EncryptedKeyVersion edek = null;
+
+    FileEdekInfo(FSDirectory dir, INodeFile inode) throws IOException {
+      assert dir.hasReadLock();
+      Preconditions.checkNotNull(inode, "INodeFile is null");
+      inodeId = inode.getId();
+      final FileEncryptionInfo fei = FSDirEncryptionZoneOp
+          .getFileEncryptionInfo(dir, INodesInPath.fromINode(inode));
+      Preconditions.checkNotNull(fei,
+          "FileEncryptionInfo is null for " + inodeId);
+      existingEdek = EncryptedKeyVersion
+          .createForDecryption(fei.getKeyName(), fei.getEzKeyVersionName(),
+              fei.getIV(), fei.getEncryptedDataEncryptionKey());
+    }
+
+    long getInodeId() {
+      return inodeId;
+    }
+
+    EncryptedKeyVersion getExistingEdek() {
+      return existingEdek;
+    }
+
+    void setEdek(final EncryptedKeyVersion ekv) {
+      assert ekv != null;
+      edek = ekv;
+    }
+  }
+
+  @VisibleForTesting
+  synchronized void pauseForTesting() {
+    shouldPauseForTesting = true;
+    LOG.info("Pausing re-encrypt updater for testing.");
+    notify();
+  }
+
+  @VisibleForTesting
+  synchronized void resumeForTesting() {
+    shouldPauseForTesting = false;
+    LOG.info("Resuming re-encrypt updater for testing.");
+    notify();
+  }
+
+  @VisibleForTesting
+  void pauseForTestingAfterNthCheckpoint(final long zoneId, final int count) {
+    assert pauseAfterNthCheckpoint == 0;
+    pauseAfterNthCheckpoint = count;
+    pauseZoneId = zoneId;
+  }
+
+  private final FSDirectory dir;
+  private final CompletionService<ReencryptionTask> batchService;
+  private final ReencryptionHandler handler;
+
+  ReencryptionUpdater(final FSDirectory fsd,
+      final CompletionService<ReencryptionTask> service,
+      final ReencryptionHandler rh, final Configuration conf) {
+    dir = fsd;
+    batchService = service;
+    handler = rh;
+    this.throttleLimitRatio =
+        conf.getDouble(DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_UPDATER_RATIO_KEY,
+            DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_UPDATER_RATIO_DEFAULT);
+    Preconditions.checkArgument(throttleLimitRatio > 0.0f,
+        DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_UPDATER_RATIO_KEY
+            + " is not positive.");
+  }
+
+  /**
+   * Called by the submission thread to indicate all tasks have been submitted.
+   * If this is called but no tasks has been submitted, the re-encryption is
+   * considered complete.
+   *
+   * @param zoneId Id of the zone inode.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  void markZoneSubmissionDone(final long zoneId)
+      throws IOException, InterruptedException {
+    final ZoneSubmissionTracker tracker = handler.getTracker(zoneId);
+    if (tracker != null) {
+      tracker.submissionDone = true;
+    } else {
+      // Caller thinks submission is done, but no tasks submitted - meaning
+      // no files in the EZ need to be re-encrypted. Complete directly.
+      handler.addDummyTracker(zoneId);
+    }
+  }
+
+  @Override
+  public void run() {
+    throttleTimerAll.start();
+    while (true) {
+      try {
+        // Assuming single-threaded updater.
+        takeAndProcessTasks();
+      } catch (InterruptedException ie) {
+        LOG.warn("Re-encryption updater thread interrupted. Exiting.");
+        Thread.currentThread().interrupt();
+        return;
+      } catch (IOException ioe) {
+        LOG.warn("Re-encryption updater thread exception.", ioe);
+      } catch (Throwable t) {
+        LOG.error("Re-encryption updater thread exiting.", t);
+        return;
+      }
+    }
+  }
+
+  /**
+   * Process a completed ReencryptionTask. Each inode id is resolved to an INode
+   * object, skip if the inode is deleted.
+   * <p>
+   * Only file xattr is updated by this method. Re-encryption progress is not
+   * updated.
+   *
+   * @param zoneNodePath full path of the EZ inode.
+   * @param task     the completed task.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private void processTaskEntries(final String zoneNodePath,
+      final ReencryptionTask task) throws IOException, InterruptedException {
+    assert dir.hasWriteLock();
+    if (!task.batch.isEmpty() && task.numFailures == 0) {
+      LOG.debug(
+          "Updating file xattrs for re-encrypting zone {}," + " starting at {}",
+          zoneNodePath, task.batch.getFirstFilePath());
+      for (Iterator<FileEdekInfo> it = task.batch.getBatch().iterator();
+           it.hasNext();) {
+        FileEdekInfo entry = it.next();
+        // resolve the inode again, and skip if it's doesn't exist
+        LOG.trace("Updating {} for re-encryption.", entry.getInodeId());
+        final INode inode = dir.getInode(entry.getInodeId());
+        if (inode == null) {
+          LOG.debug("INode {} doesn't exist, skipping re-encrypt.",
+              entry.getInodeId());
+          // also remove from batch so later it's not saved.
+          it.remove();
+          continue;
+        }
+
+        // Cautiously check file encryption info, and only update if we're sure
+        // it's still using the same edek.
+        Preconditions.checkNotNull(entry.edek);
+        final FileEncryptionInfo fei = FSDirEncryptionZoneOp
+            .getFileEncryptionInfo(dir, INodesInPath.fromINode(inode));
+        if (!fei.getKeyName().equals(entry.edek.getEncryptionKeyName())) {
+          LOG.debug("Inode {} EZ key changed, skipping re-encryption.",
+              entry.getInodeId());
+          it.remove();
+          continue;
+        }
+        if (fei.getEzKeyVersionName()
+            .equals(entry.edek.getEncryptionKeyVersionName())) {
+          LOG.debug(
+              "Inode {} EZ key version unchanged, skipping re-encryption.",
+              entry.getInodeId());
+          it.remove();
+          continue;
+        }
+        if (!Arrays.equals(fei.getEncryptedDataEncryptionKey(),
+            entry.existingEdek.getEncryptedKeyVersion().getMaterial())) {
+          LOG.debug("Inode {} existing edek changed, skipping re-encryption",
+              entry.getInodeId());
+          it.remove();
+          continue;
+        }
+        FileEncryptionInfo newFei = new FileEncryptionInfo(fei.getCipherSuite(),
+            fei.getCryptoProtocolVersion(),
+            entry.edek.getEncryptedKeyVersion().getMaterial(),
+            entry.edek.getEncryptedKeyIv(), fei.getKeyName(),
+            entry.edek.getEncryptionKeyVersionName());
+        final INodesInPath iip = INodesInPath.fromINode(inode);
+        FSDirEncryptionZoneOp
+            .setFileEncryptionInfo(dir, iip, newFei, XAttrSetFlag.REPLACE);
+        task.lastFile = iip.getPath();
+        ++task.numFilesUpdated;
+      }
+
+      LOG.info("Updated xattrs on {}({}) files in zone {} for re-encryption,"
+              + " starting:{}.", task.numFilesUpdated, task.batch.size(),
+          zoneNodePath, task.batch.getFirstFilePath());
+    }
+    task.processed = true;
+  }
+
+  /**
+   * Iterate tasks for the given zone, and update progress accordingly. The
+   * checkpoint indicates all files before it are done re-encryption, so it will
+   * be updated to the position where all tasks before are completed.
+   *
+   * @param zoneNode the EZ inode.
+   * @param tracker  the zone submission tracker.
+   * @return the list containing the last checkpointed xattr. Empty if
+   *   no checkpoint happened.
+   * @throws ExecutionException
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private List<XAttr> processCheckpoints(final INode zoneNode,
+      final ZoneSubmissionTracker tracker)
+      throws ExecutionException, IOException, InterruptedException {
+    assert dir.hasWriteLock();
+    final long zoneId = zoneNode.getId();
+    final String zonePath = zoneNode.getFullPathName();
+    final ZoneReencryptionStatus status =
+        handler.getReencryptionStatus().getZoneStatus(zoneId);
+    assert status != null;
+    // always start from the beginning, because the checkpoint means all files
+    // before it are re-encrypted.
+    final LinkedList<Future> tasks = tracker.getTasks();
+    final List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
+    ListIterator<Future> iter = tasks.listIterator();
+    while (iter.hasNext()) {
+      Future<ReencryptionTask> curr = iter.next();
+      if (!curr.isDone() || !curr.get().processed) {
+        // still has earlier tasks not completed, skip here.
+        break;
+      }
+      ReencryptionTask task = curr.get();
+      LOG.debug("Updating re-encryption checkpoint with completed task."
+          + " last: {} size:{}.", task.lastFile, task.batch.size());
+      assert zoneId == task.zoneId;
+      try {
+        final XAttr xattr = FSDirEncryptionZoneOp
+            .updateReencryptionProgress(dir, zoneNode, status, task.lastFile,
+                task.numFilesUpdated, task.numFailures);
+        xAttrs.clear();
+        xAttrs.add(xattr);
+      } catch (IOException ie) {
+        LOG.warn("Failed to update re-encrypted progress to xattr for zone {}",
+            zonePath, ie);
+        ++task.numFailures;
+      }
+      ++tracker.numCheckpointed;
+      iter.remove();
+    }
+    if (tracker.isCompleted()) {
+      LOG.debug("Removed re-encryption tracker for zone {} because it completed"
+              + " with {} tasks.", zonePath, tracker.numCheckpointed);
+      return handler.completeReencryption(zoneNode);
+    }
+    return xAttrs;
+  }
+
+  private void takeAndProcessTasks() throws Exception {
+    final Future<ReencryptionTask> completed = batchService.take();
+    throttle();
+    checkPauseForTesting();
+    ReencryptionTask task = completed.get();
+    if (completed.isCancelled()) {
+      LOG.debug("Skipped canceled re-encryption task for zone {}, last: {}",
+          task.zoneId, task.lastFile);
+    }
+
+    boolean shouldRetry;
+    do {
+      dir.getFSNamesystem().writeLock();
+      try {
+        throttleTimerLocked.start();
+        processTask(task);
+        shouldRetry = false;
+      } catch (RetriableException | SafeModeException re) {
+        // Keep retrying until succeed.
+        LOG.info("Exception when processing re-encryption task for zone {}, "
+                + "retrying...", task.zoneId, re);
+        shouldRetry = true;
+        Thread.sleep(faultRetryInterval);
+      } catch (IOException ioe) {
+        LOG.warn("Failure processing re-encryption task for zone {}",
+            task.zoneId, ioe);
+        ++task.numFailures;
+        task.processed = true;
+        shouldRetry = false;
+      } finally {
+        dir.getFSNamesystem().writeUnlock("reencryptUpdater");
+        throttleTimerLocked.stop();
+      }
+      // logSync regardless, to prevent edit log buffer overflow triggering
+      // logSync inside FSN writelock.
+      dir.getEditLog().logSync();
+    } while (shouldRetry);
+  }
+
+  private void processTask(ReencryptionTask task)
+      throws InterruptedException, ExecutionException, IOException {
+    final List<XAttr> xAttrs;
+    final String zonePath;
+    dir.writeLock();
+    try {
+      handler.checkZoneReady(task.zoneId);
+      final INode zoneNode = dir.getInode(task.zoneId);
+      if (zoneNode == null) {
+        // ez removed.
+        return;
+      }
+      zonePath = zoneNode.getFullPathName();
+      LOG.info("Processing returned re-encryption task for zone {}({}), "
+              + "batch size {}, start:{}", zonePath, task.zoneId,
+          task.batch.size(), task.batch.getFirstFilePath());
+      final ZoneSubmissionTracker tracker =
+          handler.getTracker(zoneNode.getId());
+      Preconditions.checkNotNull(tracker, "zone tracker not found " + zonePath);
+      tracker.numFutureDone++;
+      EncryptionFaultInjector.getInstance().reencryptUpdaterProcessOneTask();
+      processTaskEntries(zonePath, task);
+      EncryptionFaultInjector.getInstance().reencryptUpdaterProcessCheckpoint();
+      xAttrs = processCheckpoints(zoneNode, tracker);
+    } finally {
+      dir.writeUnlock();
+    }
+    FSDirEncryptionZoneOp.saveFileXAttrsForBatch(dir, task.batch.getBatch());
+    if (!xAttrs.isEmpty()) {
+      dir.getEditLog().logSetXAttrs(zonePath, xAttrs, false);
+    }
+  }
+
+  private synchronized void checkPauseForTesting() throws InterruptedException {
+    assert !dir.hasWriteLock();
+    assert !dir.getFSNamesystem().hasWriteLock();
+    if (pauseAfterNthCheckpoint != 0) {
+      ZoneSubmissionTracker tracker =
+          handler.unprotectedGetTracker(pauseZoneId);
+      if (tracker != null) {
+        if (tracker.numFutureDone == pauseAfterNthCheckpoint) {
+          shouldPauseForTesting = true;
+          pauseAfterNthCheckpoint = 0;
+        }
+      }
+    }
+    while (shouldPauseForTesting) {
+      LOG.info("Sleeping in the re-encryption updater for unit test.");
+      wait();
+      LOG.info("Continuing re-encryption updater after pausing.");
+    }
+  }
+
+  /**
+   * Throttles the ReencryptionUpdater to prevent from contending FSN/FSD write
+   * locks. This is done by the configuration.
+   */
+  private void throttle() throws InterruptedException {
+    if (throttleLimitRatio >= 1.0) {
+      return;
+    }
+
+    final long expect = (long) (throttleTimerAll.now(TimeUnit.MILLISECONDS)
+        * throttleLimitRatio);
+    final long actual = throttleTimerLocked.now(TimeUnit.MILLISECONDS);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Re-encryption updater throttling expect: {}, actual: {},"
+              + " throttleTimerAll:{}", expect, actual,
+          throttleTimerAll.now(TimeUnit.MILLISECONDS));
+    }
+    if (expect - actual < 0) {
+      // in case throttleLimitHandlerRatio is very small, expect will be 0.
+      // so sleepMs should not be calculated from expect, to really meet the
+      // ratio. e.g. if ratio is 0.001, expect = 0 and actual = 1, sleepMs
+      // should be 1000 - throttleTimerAll.now()
+      final long sleepMs =
+          (long) (actual / throttleLimitRatio) - throttleTimerAll
+              .now(TimeUnit.MILLISECONDS);
+      LOG.debug("Throttling re-encryption, sleeping for {} ms", sleepMs);
+      Thread.sleep(sleepMs);
+    }
+    throttleTimerAll.reset().start();
+    throttleTimerLocked.reset();
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org