You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2017/08/24 19:36:39 UTC
[46/50] [abbrv] hadoop git commit: HDFS-10899. Add functionality to
re-encrypt EDEKs.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java
index 22039d1..2552cf5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_FILE_ENCRYPTION_INFO;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.security.PrivilegedExceptionAction;
@@ -31,6 +32,7 @@ import java.util.Map;
import org.apache.hadoop.crypto.CipherSuite;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
import org.apache.hadoop.fs.FileEncryptionInfo;
@@ -42,15 +44,22 @@ import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.XAttrHelper;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
+import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReencryptionInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ZoneEncryptionInfoProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
+import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.FileEdekInfo;
import org.apache.hadoop.security.SecurityUtil;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.util.Time;
+
+import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_ENCRYPTION_ZONE;
import static org.apache.hadoop.util.Time.monotonicNow;
/**
@@ -216,18 +225,206 @@ final class FSDirEncryptionZoneOp {
}
}
+ static void reencryptEncryptionZone(final FSDirectory fsd,
+ final String zone, final String keyVersionName,
+ final boolean logRetryCache) throws IOException {
+ final List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
+ final FSPermissionChecker pc = fsd.getPermissionChecker();
+ fsd.writeLock();
+ try {
+ final INodesInPath iip = fsd.resolvePath(pc, zone, DirOp.WRITE);
+ final XAttr xattr = fsd.ezManager
+ .reencryptEncryptionZone(iip, keyVersionName);
+ xAttrs.add(xattr);
+ } finally {
+ fsd.writeUnlock();
+ }
+ fsd.getEditLog().logSetXAttrs(zone, xAttrs, logRetryCache);
+ }
+
+ static void cancelReencryptEncryptionZone(final FSDirectory fsd,
+ final String zone, final boolean logRetryCache) throws IOException {
+ final List<XAttr> xattrs;
+ final FSPermissionChecker pc = fsd.getPermissionChecker();
+ fsd.writeLock();
+ try {
+ final INodesInPath iip = fsd.resolvePath(pc, zone, DirOp.WRITE);
+ xattrs = fsd.ezManager.cancelReencryptEncryptionZone(iip);
+ } finally {
+ fsd.writeUnlock();
+ }
+ if (xattrs != null && !xattrs.isEmpty()) {
+ fsd.getEditLog().logSetXAttrs(zone, xattrs, logRetryCache);
+ }
+ }
+
+ static BatchedListEntries<ZoneReencryptionStatus> listReencryptionStatus(
+ final FSDirectory fsd, final long prevId)
+ throws IOException {
+ fsd.readLock();
+ try {
+ return fsd.ezManager.listReencryptionStatus(prevId);
+ } finally {
+ fsd.readUnlock();
+ }
+ }
+
+ /**
+ * Update re-encryption progress (submitted). Caller should
+ * logSync after calling this, outside of the FSN lock.
+ * <p>
+ * The reencryption status is updated during SetXAttrs.
+ */
+ static XAttr updateReencryptionSubmitted(final FSDirectory fsd,
+ final INodesInPath iip, final String ezKeyVersionName)
+ throws IOException {
+ assert fsd.hasWriteLock();
+ Preconditions.checkNotNull(ezKeyVersionName, "ezKeyVersionName is null.");
+ final ZoneEncryptionInfoProto zoneProto = getZoneEncryptionInfoProto(iip);
+ Preconditions.checkNotNull(zoneProto, "ZoneEncryptionInfoProto is null.");
+
+ final ReencryptionInfoProto newProto = PBHelperClient
+ .convert(ezKeyVersionName, Time.now(), false, 0, 0, null, null);
+ final ZoneEncryptionInfoProto newZoneProto = PBHelperClient
+ .convert(PBHelperClient.convert(zoneProto.getSuite()),
+ PBHelperClient.convert(zoneProto.getCryptoProtocolVersion()),
+ zoneProto.getKeyName(), newProto);
+
+ final XAttr xattr = XAttrHelper
+ .buildXAttr(CRYPTO_XATTR_ENCRYPTION_ZONE, newZoneProto.toByteArray());
+ final List<XAttr> xattrs = Lists.newArrayListWithCapacity(1);
+ xattrs.add(xattr);
+ FSDirXAttrOp.unprotectedSetXAttrs(fsd, iip, xattrs,
+ EnumSet.of(XAttrSetFlag.REPLACE));
+ return xattr;
+ }
+
+ /**
+ * Update re-encryption progress (start, checkpoint). Caller should
+ * logSync after calling this, outside of the FSN lock.
+ * <p>
+ * The reencryption status is updated during SetXAttrs.
+ * Original reencryption status is passed in to get existing information
+ * such as ezkeyVersionName and submissionTime.
+ */
+ static XAttr updateReencryptionProgress(final FSDirectory fsd,
+ final INode zoneNode, final ZoneReencryptionStatus origStatus,
+ final String lastFile, final long numReencrypted, final long numFailures)
+ throws IOException {
+ assert fsd.hasWriteLock();
+ Preconditions.checkNotNull(zoneNode, "Zone node is null");
+ INodesInPath iip = INodesInPath.fromINode(zoneNode);
+ final ZoneEncryptionInfoProto zoneProto = getZoneEncryptionInfoProto(iip);
+ Preconditions.checkNotNull(zoneProto, "ZoneEncryptionInfoProto is null.");
+ Preconditions.checkNotNull(origStatus, "Null status for " + iip.getPath());
+
+ final ReencryptionInfoProto newProto = PBHelperClient
+ .convert(origStatus.getEzKeyVersionName(),
+ origStatus.getSubmissionTime(), false,
+ origStatus.getFilesReencrypted() + numReencrypted,
+ origStatus.getNumReencryptionFailures() + numFailures, null,
+ lastFile);
+
+ final ZoneEncryptionInfoProto newZoneProto = PBHelperClient
+ .convert(PBHelperClient.convert(zoneProto.getSuite()),
+ PBHelperClient.convert(zoneProto.getCryptoProtocolVersion()),
+ zoneProto.getKeyName(), newProto);
+
+ final XAttr xattr = XAttrHelper
+ .buildXAttr(CRYPTO_XATTR_ENCRYPTION_ZONE, newZoneProto.toByteArray());
+ final List<XAttr> xattrs = Lists.newArrayListWithCapacity(1);
+ xattrs.add(xattr);
+ FSDirXAttrOp.unprotectedSetXAttrs(fsd, iip, xattrs,
+ EnumSet.of(XAttrSetFlag.REPLACE));
+ return xattr;
+ }
+
+ /**
+ * Log re-encrypt complete (cancel, or 100% re-encrypt) to edits.
+ * Caller should logSync after calling this, outside of the FSN lock.
+ * <p>
+ * Original reencryption status is passed in to get existing information,
+ * this should include whether it is finished due to cancellation.
+ * The reencryption status is updated during SetXAttrs for completion time.
+ */
+ static List<XAttr> updateReencryptionFinish(final FSDirectory fsd,
+ final INodesInPath zoneIIP, final ZoneReencryptionStatus origStatus)
+ throws IOException {
+ assert origStatus != null;
+ assert fsd.hasWriteLock();
+ fsd.ezManager.getReencryptionStatus()
+ .markZoneCompleted(zoneIIP.getLastINode().getId());
+ final XAttr xattr =
+ generateNewXAttrForReencryptionFinish(zoneIIP, origStatus);
+ final List<XAttr> xattrs = Lists.newArrayListWithCapacity(1);
+ xattrs.add(xattr);
+ FSDirXAttrOp.unprotectedSetXAttrs(fsd, zoneIIP, xattrs,
+ EnumSet.of(XAttrSetFlag.REPLACE));
+ return xattrs;
+ }
+
+ static XAttr generateNewXAttrForReencryptionFinish(final INodesInPath iip,
+ final ZoneReencryptionStatus status) throws IOException {
+ final ZoneEncryptionInfoProto zoneProto = getZoneEncryptionInfoProto(iip);
+ final ReencryptionInfoProto newRiProto = PBHelperClient
+ .convert(status.getEzKeyVersionName(), status.getSubmissionTime(),
+ status.isCanceled(), status.getFilesReencrypted(),
+ status.getNumReencryptionFailures(), Time.now(), null);
+
+ final ZoneEncryptionInfoProto newZoneProto = PBHelperClient
+ .convert(PBHelperClient.convert(zoneProto.getSuite()),
+ PBHelperClient.convert(zoneProto.getCryptoProtocolVersion()),
+ zoneProto.getKeyName(), newRiProto);
+
+ final XAttr xattr = XAttrHelper
+ .buildXAttr(CRYPTO_XATTR_ENCRYPTION_ZONE, newZoneProto.toByteArray());
+ return xattr;
+ }
+
+ private static ZoneEncryptionInfoProto getZoneEncryptionInfoProto(
+ final INodesInPath iip) throws IOException {
+ final XAttr fileXAttr = FSDirXAttrOp
+ .unprotectedGetXAttrByPrefixedName(iip, CRYPTO_XATTR_ENCRYPTION_ZONE);
+ if (fileXAttr == null) {
+ throw new IOException(
+ "Could not find reencryption XAttr for file " + iip.getPath());
+ }
+ try {
+ return ZoneEncryptionInfoProto.parseFrom(fileXAttr.getValue());
+ } catch (InvalidProtocolBufferException e) {
+ throw new IOException(
+ "Could not parse file encryption info for " + "inode " + iip
+ .getPath(), e);
+ }
+ }
+
+ /**
+ * Save the batch's edeks to file xattrs.
+ */
+ static void saveFileXAttrsForBatch(FSDirectory fsd,
+ List<FileEdekInfo> batch) {
+ assert fsd.getFSNamesystem().hasWriteLock();
+ if (batch != null && !batch.isEmpty()) {
+ for (FileEdekInfo entry : batch) {
+ final INode inode = fsd.getInode(entry.getInodeId());
+ Preconditions.checkNotNull(inode);
+ fsd.getEditLog().logSetXAttrs(inode.getFullPathName(),
+ inode.getXAttrFeature().getXAttrs(), false);
+ }
+ }
+ }
+
/**
* Set the FileEncryptionInfo for an INode.
*
* @param fsd fsdirectory
- * @param src the path of a directory which will be the root of the
- * encryption zone.
* @param info file encryption information
+ * @param flag action when setting xattr. Either CREATE or REPLACE.
* @throws IOException
*/
static void setFileEncryptionInfo(final FSDirectory fsd,
- final INodesInPath iip, final FileEncryptionInfo info)
- throws IOException {
+ final INodesInPath iip, final FileEncryptionInfo info,
+ final XAttrSetFlag flag) throws IOException {
// Make the PB for the xattr
final HdfsProtos.PerFileEncryptionInfoProto proto =
PBHelperClient.convertPerFileEncInfo(info);
@@ -238,8 +435,7 @@ final class FSDirEncryptionZoneOp {
xAttrs.add(fileEncryptionAttr);
fsd.writeLock();
try {
- FSDirXAttrOp.unprotectedSetXAttrs(fsd, iip, xAttrs,
- EnumSet.of(XAttrSetFlag.CREATE));
+ FSDirXAttrOp.unprotectedSetXAttrs(fsd, iip, xAttrs, EnumSet.of(flag));
} finally {
fsd.writeUnlock();
}
@@ -500,4 +696,34 @@ final class FSDirEncryptionZoneOp {
this.edek = edek;
}
}
+
+ /**
+ * Get the last key version name for the given EZ. This will contact
+ * the KMS to getKeyVersions.
+ * @param zone the encryption zone
+ * @param pc the permission checker
+ * @return the last element from the list of keyVersionNames returned by KMS.
+ * @throws IOException
+ */
+ static KeyVersion getLatestKeyVersion(final FSDirectory dir,
+ final String zone, final FSPermissionChecker pc) throws IOException {
+ final EncryptionZone ez;
+ assert dir.getProvider() != null;
+ dir.readLock();
+ try {
+ final INodesInPath iip = dir.resolvePath(pc, zone, DirOp.READ);
+ if (iip.getLastINode() == null) {
+ throw new FileNotFoundException(zone + " does not exist.");
+ }
+ dir.ezManager.checkEncryptionZoneRoot(iip.getLastINode(), iip.getPath());
+ ez = FSDirEncryptionZoneOp.getEZForPath(dir, iip);
+ } finally {
+ dir.readUnlock();
+ }
+ // Contact KMS out of locks.
+ KeyVersion currKv = dir.getProvider().getCurrentKey(ez.getKeyName());
+ Preconditions.checkNotNull(currKv,
+ "No current key versions for key name " + ez.getKeyName());
+ return currKv;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
index 7ab05d7..012e916 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.hdfs.AddBlockFlag;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileAlreadyExistsException;
@@ -397,7 +398,8 @@ class FSDirWriteFileOp {
newNode.getFileUnderConstructionFeature().getClientName(),
newNode.getId());
if (feInfo != null) {
- FSDirEncryptionZoneOp.setFileEncryptionInfo(fsd, iip, feInfo);
+ FSDirEncryptionZoneOp.setFileEncryptionInfo(fsd, iip, feInfo,
+ XAttrSetFlag.CREATE);
}
setNewINodeStoragePolicy(fsd.getBlockManager(), iip, isLazyPersist);
fsd.getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
index ddc088c..acdade7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.XAttrHelper;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReencryptionInfoProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
import org.apache.hadoop.security.AccessControlException;
@@ -275,6 +276,12 @@ class FSDirXAttrOp {
PBHelperClient.convert(ezProto.getSuite()),
PBHelperClient.convert(ezProto.getCryptoProtocolVersion()),
ezProto.getKeyName());
+
+ if (ezProto.hasReencryptionProto()) {
+ ReencryptionInfoProto reProto = ezProto.getReencryptionProto();
+ fsd.ezManager.getReencryptionStatus()
+ .updateZoneStatus(inode.getId(), iip.getPath(), reProto);
+ }
}
if (!isFile && SECURITY_XATTR_UNREADABLE_BY_SUPERUSER.equals(xaName)) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 87b1156..e6aa533 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReencryptionInfoProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
@@ -1358,12 +1359,18 @@ public class FSDirectory implements Closeable {
}
try {
final HdfsProtos.ZoneEncryptionInfoProto ezProto =
- HdfsProtos.ZoneEncryptionInfoProto.parseFrom(
- xattr.getValue());
+ HdfsProtos.ZoneEncryptionInfoProto.parseFrom(xattr.getValue());
ezManager.unprotectedAddEncryptionZone(inode.getId(),
PBHelperClient.convert(ezProto.getSuite()),
PBHelperClient.convert(ezProto.getCryptoProtocolVersion()),
ezProto.getKeyName());
+ if (ezProto.hasReencryptionProto()) {
+ final ReencryptionInfoProto reProto = ezProto.getReencryptionProto();
+ // inodes parents may not be loaded if this is done during fsimage
+ // loading so cannot set full path now. Pass in null to indicate that.
+ ezManager.getReencryptionStatus()
+ .updateZoneStatus(inode.getId(), null, reProto);
+ }
} catch (InvalidProtocolBufferException e) {
NameNode.LOG.warn("Error parsing protocol buffer of " +
"EZ XAttr " + xattr.getName() + " dir:" + inode.getFullPathName());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 2313335..12d96d8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -89,9 +89,11 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
import static org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.*;
+import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
import org.apache.hadoop.hdfs.protocol.BlocksStats;
import org.apache.hadoop.hdfs.protocol.ECBlockGroupsStats;
import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
+import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
import org.apache.hadoop.hdfs.server.namenode.metrics.ReplicatedBlocksMBean;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import static org.apache.hadoop.util.Time.now;
@@ -199,6 +201,7 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.IllegalECPolicyException;
@@ -1230,6 +1233,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
dir.updateCountForQuota();
// Enable quota checks.
dir.enableQuotaChecks();
+ dir.ezManager.startReencryptThreads();
+
if (haEnabled) {
// Renew all of the leases before becoming active.
// This is because, while we were in standby mode,
@@ -1321,6 +1326,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
// so that the tailer starts from the right spot.
getFSImage().updateLastAppliedTxIdFromWritten();
}
+ if (dir != null) {
+ dir.ezManager.stopReencryptThread();
+ }
if (cacheManager != null) {
cacheManager.stopMonitorThread();
cacheManager.clearDirectiveStats();
@@ -7031,6 +7039,84 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
}
+ void reencryptEncryptionZone(final String zone, final ReencryptAction action,
+ final boolean logRetryCache) throws IOException {
+ boolean success = false;
+ try {
+ Preconditions.checkNotNull(zone, "zone is null.");
+ checkSuperuserPrivilege();
+ checkOperation(OperationCategory.WRITE);
+ checkNameNodeSafeMode("NameNode in safemode, cannot " + action
+ + " re-encryption on zone " + zone);
+ reencryptEncryptionZoneInt(zone, action, logRetryCache);
+ success = true;
+ } finally {
+ logAuditEvent(success, action + "reencryption", zone, null, null);
+ }
+ }
+
+ BatchedListEntries<ZoneReencryptionStatus> listReencryptionStatus(
+ final long prevId) throws IOException {
+ final String operationName = "listReencryptionStatus";
+ boolean success = false;
+ checkSuperuserPrivilege();
+ checkOperation(OperationCategory.READ);
+ readLock();
+ try {
+ checkSuperuserPrivilege();
+ checkOperation(OperationCategory.READ);
+ final BatchedListEntries<ZoneReencryptionStatus> ret =
+ FSDirEncryptionZoneOp.listReencryptionStatus(dir, prevId);
+ success = true;
+ return ret;
+ } finally {
+ readUnlock(operationName);
+ logAuditEvent(success, operationName, null);
+ }
+ }
+
+ private void reencryptEncryptionZoneInt(final String zone,
+ final ReencryptAction action, final boolean logRetryCache)
+ throws IOException {
+ if (getProvider() == null) {
+ throw new IOException("No key provider configured, re-encryption "
+ + "operation is rejected");
+ }
+ FSPermissionChecker pc = getPermissionChecker();
+ // get keyVersionName out of the lock. This keyVersionName will be used
+ // as the target keyVersion for the entire re-encryption.
+ // This means all edek's keyVersion will be compared with this one, and
+ // kms is only contacted if the edek's keyVersion is different.
+ final KeyVersion kv =
+ FSDirEncryptionZoneOp.getLatestKeyVersion(dir, zone, pc);
+ provider.invalidateCache(kv.getName());
+ writeLock();
+ try {
+ checkSuperuserPrivilege();
+ checkOperation(OperationCategory.WRITE);
+ checkNameNodeSafeMode(
+ "NameNode in safemode, cannot " + action + " re-encryption on zone "
+ + zone);
+ switch (action) {
+ case START:
+ FSDirEncryptionZoneOp
+ .reencryptEncryptionZone(dir, zone, kv.getVersionName(),
+ logRetryCache);
+ break;
+ case CANCEL:
+ FSDirEncryptionZoneOp
+ .cancelReencryptEncryptionZone(dir, zone, logRetryCache);
+ break;
+ default:
+ throw new IOException(
+ "Re-encryption action " + action + " is not supported");
+ }
+ } finally {
+ writeUnlock();
+ }
+ getEditLog().logSync();
+ }
+
/**
* Set an erasure coding policy on the given path.
* @param srcArg The path of the target directory.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 7871202..3fbb7bd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -105,6 +105,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSLimitException;
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -116,6 +117,7 @@ import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocol.BlocksStats;
+import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
@@ -2052,6 +2054,31 @@ public class NameNodeRpcServer implements NamenodeProtocols {
}
@Override // ClientProtocol
+ public void reencryptEncryptionZone(final String zone,
+ final ReencryptAction action) throws IOException {
+ checkNNStartup();
+ namesystem.checkOperation(OperationCategory.WRITE);
+ final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return;
+ }
+ boolean success = false;
+ try {
+ namesystem.reencryptEncryptionZone(zone, action, cacheEntry != null);
+ success = true;
+ } finally {
+ RetryCache.setState(cacheEntry, success);
+ }
+ }
+
+ @Override // ClientProtocol
+ public BatchedEntries<ZoneReencryptionStatus> listReencryptionStatus(
+ final long prevId) throws IOException {
+ checkNNStartup();
+ return namesystem.listReencryptionStatus(prevId);
+ }
+
+ @Override // ClientProtocol
public void setErasureCodingPolicy(String src, String ecPolicyName)
throws IOException {
checkNNStartup();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java
new file mode 100644
index 0000000..729b894
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java
@@ -0,0 +1,940 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.fs.XAttr;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.ReencryptionStatus;
+import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
+import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus.State;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.FileEdekInfo;
+import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.ReencryptionTask;
+import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.ZoneSubmissionTracker;
+import org.apache.hadoop.hdfs.util.ReadOnlyList;
+import org.apache.hadoop.ipc.RetriableException;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.StopWatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_BATCH_SIZE_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_BATCH_SIZE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_EDEK_THREADS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_EDEK_THREADS_KEY;
+
+/**
+ * Class for handling re-encrypt EDEK operations.
+ * <p>
+ * For each EZ, ReencryptionHandler walks the tree in a depth-first order,
+ * and submits batches of (files + existing edeks) as re-encryption tasks
+ * to a thread pool. Each thread in the pool then contacts the KMS to
+ * re-encrypt the edeks. ReencryptionUpdater tracks the tasks and updates
+ * file xattrs with the new edeks.
+ * <p>
+ * File renames are disabled in the EZ that's being re-encrypted. Newly created
+ * files will have new edeks, because the edek cache is drained upon the
+ * submission of a re-encryption command.
+ * <p>
+ * It is assumed only 1 ReencryptionHandler will be running, because:
+ * 1. The bottleneck of the entire re-encryption appears to be on the KMS.
+ * 2. Even with multiple handlers, since updater requires writelock and is
+ * single-threaded, the performance gain is limited.
+ * <p>
+ * This class uses the FSDirectory lock for synchronization.
+ */
+@InterfaceAudience.Private
+public class ReencryptionHandler implements Runnable {
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(ReencryptionHandler.class);
+
+ // 2000 is based on buffer size = 512 * 1024, and SetXAttr op size is
+ // 100 - 200 bytes (depending on the xattr value).
+ // The buffer size is hard-coded, see outputBufferCapacity from QJM.
+ private static final int MAX_BATCH_SIZE_WITHOUT_FLOODING = 2000;
+
+ private final EncryptionZoneManager ezManager;
+ private final FSDirectory dir;
+ private final long interval;
+ private final int reencryptBatchSize;
+ private double throttleLimitHandlerRatio;
+ private final int reencryptThreadPoolSize;
+ // stopwatches for throttling
+ private final StopWatch throttleTimerAll = new StopWatch();
+ private final StopWatch throttleTimerLocked = new StopWatch();
+
+ private ExecutorCompletionService<ReencryptionTask> batchService;
+ private BlockingQueue<Runnable> taskQueue;
+ // protected by ReencryptionHandler object lock
+ private final Map<Long, ZoneSubmissionTracker> submissions =
+ new ConcurrentHashMap<>();
+
+ // The current batch that the handler is working on. Handler is designed to
+ // be single-threaded, see class javadoc for more details.
+ private ReencryptionBatch currentBatch;
+
+ private final ReencryptionUpdater reencryptionUpdater;
+ private ExecutorService updaterExecutor;
+
+ // Vars for unit tests.
+ private volatile boolean shouldPauseForTesting = false;
+ private volatile int pauseAfterNthSubmission = 0;
+
+ /**
+ * Stop the re-encryption updater thread, as well as all EDEK re-encryption
+ * tasks submitted.
+ */
+ void stopThreads() {
+ assert dir.hasWriteLock();
+ for (ZoneSubmissionTracker zst : submissions.values()) {
+ zst.cancelAllTasks();
+ }
+ if (updaterExecutor != null) {
+ updaterExecutor.shutdownNow();
+ }
+ }
+
+ /**
+ * Start the re-encryption updater thread.
+ */
+ void startUpdaterThread() {
+ updaterExecutor = Executors.newSingleThreadExecutor(
+ new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("reencryptionUpdaterThread #%d").build());
+ updaterExecutor.execute(reencryptionUpdater);
+ }
+
+ @VisibleForTesting
+ synchronized void pauseForTesting() {
+ shouldPauseForTesting = true;
+ LOG.info("Pausing re-encrypt handler for testing.");
+ notify();
+ }
+
+ @VisibleForTesting
+ synchronized void resumeForTesting() {
+ shouldPauseForTesting = false;
+ LOG.info("Resuming re-encrypt handler for testing.");
+ notify();
+ }
+
+ @VisibleForTesting
+ void pauseForTestingAfterNthSubmission(final int count) {
+ assert pauseAfterNthSubmission == 0;
+ pauseAfterNthSubmission = count;
+ }
+
+ @VisibleForTesting
+ void pauseUpdaterForTesting() {
+ reencryptionUpdater.pauseForTesting();
+ }
+
+ @VisibleForTesting
+ void resumeUpdaterForTesting() {
+ reencryptionUpdater.resumeForTesting();
+ }
+
+ @VisibleForTesting
+ void pauseForTestingAfterNthCheckpoint(final long zoneId, final int count) {
+ reencryptionUpdater.pauseForTestingAfterNthCheckpoint(zoneId, count);
+ }
+
+ private synchronized void checkPauseForTesting() throws InterruptedException {
+ assert !dir.hasReadLock();
+ assert !dir.getFSNamesystem().hasReadLock();
+ while (shouldPauseForTesting) {
+ LOG.info("Sleeping in the re-encrypt handler for unit test.");
+ wait();
+ LOG.info("Continuing re-encrypt handler after pausing.");
+ }
+ }
+
+ ReencryptionHandler(final EncryptionZoneManager ezMgr,
+ final Configuration conf) {
+ this.ezManager = ezMgr;
+ Preconditions.checkNotNull(ezManager.getProvider(),
+ "No provider set, cannot re-encrypt");
+ this.dir = ezMgr.getFSDirectory();
+ this.interval =
+ conf.getTimeDuration(DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_KEY,
+ DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ Preconditions.checkArgument(interval > 0,
+ DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_KEY + " is not positive.");
+ this.reencryptBatchSize = conf.getInt(DFS_NAMENODE_REENCRYPT_BATCH_SIZE_KEY,
+ DFS_NAMENODE_REENCRYPT_BATCH_SIZE_DEFAULT);
+ Preconditions.checkArgument(reencryptBatchSize > 0,
+ DFS_NAMENODE_REENCRYPT_BATCH_SIZE_KEY + " is not positive.");
+ if (reencryptBatchSize > MAX_BATCH_SIZE_WITHOUT_FLOODING) {
+ LOG.warn("Re-encryption batch size is {}. It could cause edit log buffer "
+ + "to be full and trigger a logSync within the writelock, greatly "
+ + "impacting namenode throughput.", reencryptBatchSize);
+ }
+ this.throttleLimitHandlerRatio =
+ conf.getDouble(DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_KEY,
+ DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_DEFAULT);
+ LOG.info("Configured throttleLimitHandlerRatio={} for re-encryption",
+ throttleLimitHandlerRatio);
+ Preconditions.checkArgument(throttleLimitHandlerRatio > 0.0f,
+ DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_KEY
+ + " is not positive.");
+ this.reencryptThreadPoolSize =
+ conf.getInt(DFS_NAMENODE_REENCRYPT_EDEK_THREADS_KEY,
+ DFS_NAMENODE_REENCRYPT_EDEK_THREADS_DEFAULT);
+
+ taskQueue = new LinkedBlockingQueue<>();
+ ThreadPoolExecutor threadPool =
+ new ThreadPoolExecutor(reencryptThreadPoolSize, reencryptThreadPoolSize,
+ 60, TimeUnit.SECONDS, taskQueue, new Daemon.DaemonFactory() {
+ private final AtomicInteger ind = new AtomicInteger(0);
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = super.newThread(r);
+ t.setName("reencryption edek Thread-" + ind.getAndIncrement());
+ return t;
+ }
+ }, new ThreadPoolExecutor.CallerRunsPolicy() {
+
+ @Override
+ public void rejectedExecution(Runnable runnable,
+ ThreadPoolExecutor e) {
+ LOG.info("Execution rejected, executing in current thread");
+ super.rejectedExecution(runnable, e);
+ }
+ });
+
+ threadPool.allowCoreThreadTimeOut(true);
+ this.batchService = new ExecutorCompletionService(threadPool);
+ reencryptionUpdater =
+ new ReencryptionUpdater(dir, batchService, this, conf);
+ currentBatch = new ReencryptionBatch(reencryptBatchSize);
+ }
+
+ ReencryptionStatus getReencryptionStatus() {
+ return ezManager.getReencryptionStatus();
+ }
+
+ void cancelZone(final long zoneId, final String zoneName) throws IOException {
+ assert dir.hasWriteLock();
+ final ZoneReencryptionStatus zs =
+ getReencryptionStatus().getZoneStatus(zoneId);
+ if (zs == null || zs.getState() == State.Completed) {
+ throw new IOException("Zone " + zoneName + " is not under re-encryption");
+ }
+ zs.cancel();
+ ZoneSubmissionTracker zst = submissions.get(zoneId);
+ if (zst != null) {
+ zst.cancelAllTasks();
+ }
+ }
+
+ void removeZone(final long zoneId) {
+ assert dir.hasWriteLock();
+ LOG.info("Removing zone {} from re-encryption.", zoneId);
+ ZoneSubmissionTracker zst = submissions.get(zoneId);
+ if (zst != null) {
+ zst.cancelAllTasks();
+ }
+ submissions.remove(zoneId);
+ getReencryptionStatus().removeZone(zoneId);
+ }
+
+ ZoneSubmissionTracker getTracker(final long zoneId) {
+ dir.hasReadLock();
+ return unprotectedGetTracker(zoneId);
+ }
+
+ /**
+ * get the tracker without holding the FSDirectory lock. This is only used for
+ * testing, when updater checks about pausing.
+ */
+ ZoneSubmissionTracker unprotectedGetTracker(final long zoneId) {
+ return submissions.get(zoneId);
+ }
+
+ /**
+ * Add a dummy tracker (with 1 task that has 0 files to re-encrypt)
+ * for the zone. This is necessary to complete the re-encryption in case
+ * no file in the entire zone needs re-encryption at all. We cannot simply
+ * update zone status and set zone xattrs, because in the handler we only hold
+ * readlock, and setting xattrs requires upgrading to a writelock.
+ *
+ * @param zoneId
+ */
+ void addDummyTracker(final long zoneId) {
+ assert dir.hasReadLock();
+ assert !submissions.containsKey(zoneId);
+ final ZoneSubmissionTracker zst = new ZoneSubmissionTracker();
+ zst.setSubmissionDone();
+
+ Future future = batchService.submit(
+ new EDEKReencryptCallable(zoneId, new ReencryptionBatch(), this));
+ zst.addTask(future);
+ submissions.put(zoneId, zst);
+ }
+
+ /**
+ * Main loop. It takes at most 1 zone per scan, and executes until the zone
+ * is completed.
+ * {@see #reencryptEncryptionZoneInt(Long)}.
+ */
+ @Override
+ public void run() {
+ LOG.info("Starting up re-encrypt thread with interval={} millisecond.",
+ interval);
+ while (true) {
+ try {
+ synchronized (this) {
+ wait(interval);
+ }
+ checkPauseForTesting();
+ } catch (InterruptedException ie) {
+ LOG.info("Re-encrypt handler interrupted. Exiting");
+ Thread.currentThread().interrupt();
+ return;
+ }
+
+ final Long zoneId;
+ dir.readLock();
+ try {
+ zoneId = getReencryptionStatus().getNextUnprocessedZone();
+ if (zoneId == null) {
+ // empty queue.
+ continue;
+ }
+ LOG.info("Executing re-encrypt commands on zone {}. Current zones:{}",
+ zoneId, getReencryptionStatus());
+ } finally {
+ dir.readUnlock();
+ }
+
+ try {
+ reencryptEncryptionZone(zoneId);
+ } catch (RetriableException | SafeModeException re) {
+ LOG.info("Re-encryption caught exception, will retry", re);
+ getReencryptionStatus().markZoneForRetry(zoneId);
+ } catch (IOException ioe) {
+ LOG.warn("IOException caught when re-encrypting zone {}", zoneId, ioe);
+ } catch (InterruptedException ie) {
+ LOG.info("Re-encrypt handler interrupted. Exiting.");
+ Thread.currentThread().interrupt();
+ return;
+ } catch (Throwable t) {
+ LOG.error("Re-encrypt handler thread exiting. Exception caught when"
+ + " re-encrypting zone {}.", zoneId, t);
+ return;
+ }
+ }
+ }
+
+ /**
+ * Re-encrypts a zone by recursively iterating all paths inside the zone,
+ * in lexicographic order.
+ * Files are re-encrypted, and subdirs are processed during iteration.
+ *
+ * @param zoneId the Zone's id.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ void reencryptEncryptionZone(final long zoneId)
+ throws IOException, InterruptedException {
+ throttleTimerAll.reset().start();
+ throttleTimerLocked.reset();
+ final INode zoneNode;
+ final ZoneReencryptionStatus zs;
+
+ readLock();
+ try {
+ getReencryptionStatus().markZoneStarted(zoneId);
+ zoneNode = dir.getInode(zoneId);
+ // start re-encrypting the zone from the beginning
+ if (zoneNode == null) {
+ LOG.info("Directory with id {} removed during re-encrypt, skipping",
+ zoneId);
+ return;
+ }
+ if (!zoneNode.isDirectory()) {
+ LOG.info("Cannot re-encrypt directory with id {} because it's not a"
+ + " directory.", zoneId);
+ return;
+ }
+
+ zs = getReencryptionStatus().getZoneStatus(zoneId);
+ assert zs != null;
+ // Only costly log FullPathName here once, and use id elsewhere.
+ LOG.info("Re-encrypting zone {}(id={})", zoneNode.getFullPathName(),
+ zoneId);
+ if (zs.getLastCheckpointFile() == null) {
+ // new re-encryption
+ reencryptDir(zoneNode.asDirectory(), zoneId, HdfsFileStatus.EMPTY_NAME,
+ zs.getEzKeyVersionName());
+ } else {
+ // resuming from a past re-encryption
+ restoreFromLastProcessedFile(zoneId, zs);
+ }
+ // save the last batch and mark complete
+ submitCurrentBatch(zoneId);
+ LOG.info("Submission completed of zone {} for re-encryption.", zoneId);
+ reencryptionUpdater.markZoneSubmissionDone(zoneId);
+ } finally {
+ readUnlock();
+ }
+ }
+
+ List<XAttr> completeReencryption(final INode zoneNode) throws IOException {
+ assert dir.hasWriteLock();
+ assert dir.getFSNamesystem().hasWriteLock();
+ final Long zoneId = zoneNode.getId();
+ ZoneReencryptionStatus zs = getReencryptionStatus().getZoneStatus(zoneId);
+ assert zs != null;
+ LOG.info("Re-encryption completed on zone {}. Re-encrypted {} files,"
+ + " failures encountered: {}.", zoneNode.getFullPathName(),
+ zs.getFilesReencrypted(), zs.getNumReencryptionFailures());
+ // This also removes the zone from reencryptionStatus
+ submissions.remove(zoneId);
+ return FSDirEncryptionZoneOp
+ .updateReencryptionFinish(dir, INodesInPath.fromINode(zoneNode), zs);
+ }
+
+ /**
+ * Restore the re-encryption from the progress inside ReencryptionStatus.
+ * This means start from exactly the lastProcessedFile (LPF), skipping all
+ * earlier paths in lexicographic order. Lexicographically-later directories
+ * on the LPF parent paths are added to subdirs.
+ */
+ private void restoreFromLastProcessedFile(final long zoneId,
+ final ZoneReencryptionStatus zs)
+ throws IOException, InterruptedException {
+ final INodeDirectory parent;
+ final byte[] startAfter;
+ final INodesInPath lpfIIP =
+ dir.getINodesInPath(zs.getLastCheckpointFile(), FSDirectory.DirOp.READ);
+ parent = lpfIIP.getLastINode().getParent();
+ startAfter = lpfIIP.getLastINode().getLocalNameBytes();
+ reencryptDir(parent, zoneId, startAfter, zs.getEzKeyVersionName());
+ }
+
+ /**
+ * Iterate through all files directly inside parent, and recurse down
+ * directories. The listing is done in batch, and can optionally start after
+ * a position.
+ * <p>
+ * Each batch is then send to the threadpool, where KMS will be contacted and
+ * edek re-encrypted. {@link ReencryptionUpdater} handles the tasks completed
+ * from the threadpool.
+ * <p>
+ * The iteration of the inode tree is done in a depth-first fashion. But
+ * instead of holding all INodeDirectory's in memory on the fly, only the
+ * path components to the current inode is held. This is to reduce memory
+ * consumption.
+ *
+ * @param parent The inode id of parent directory
+ * @param zoneId Id of the EZ inode
+ * @param startAfter Full path of a file the re-encrypt should start after.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ private void reencryptDir(final INodeDirectory parent, final long zoneId,
+ byte[] startAfter, final String ezKeyVerName)
+ throws IOException, InterruptedException {
+ List<byte[]> startAfters = new ArrayList<>();
+ if (parent == null) {
+ return;
+ }
+ INode curr = parent;
+ // construct startAfters all the way up to the zone inode.
+ startAfters.add(startAfter);
+ while (curr.getId() != zoneId) {
+ startAfters.add(0, curr.getLocalNameBytes());
+ curr = curr.getParent();
+ }
+ curr = reencryptDirInt(zoneId, parent, startAfters, ezKeyVerName);
+ while (!startAfters.isEmpty()) {
+ if (curr == null) {
+ // lock was reacquired, re-resolve path.
+ curr = resolvePaths(zoneId, startAfters);
+ }
+ curr = reencryptDirInt(zoneId, curr, startAfters, ezKeyVerName);
+ }
+ }
+
+ /**
+ * Resolve the cursor of re-encryption to an inode.
+ * <p>
+ * The parent of the lowest level startAfter is returned. If somewhere in the
+ * middle of startAfters changed, the parent of the lowest unchanged level is
+ * returned.
+ *
+ * @param zoneId Id of the EZ inode.
+ * @param startAfters the cursor, represented by a list of path bytes.
+ * @return the parent inode corresponding to the startAfters, or null if
+ * the EZ node (furthest parent) is deleted.
+ */
+ private INode resolvePaths(final long zoneId, List<byte[]> startAfters)
+ throws IOException {
+ // If the readlock was reacquired, we need to resolve the paths again
+ // in case things have changed. If our cursor file/dir is changed,
+ // continue from the next one.
+ INode zoneNode = dir.getInode(zoneId);
+ if (zoneNode == null) {
+ throw new FileNotFoundException("Zone " + zoneId + " is deleted.");
+ }
+ INodeDirectory parent = zoneNode.asDirectory();
+ for (int i = 0; i < startAfters.size(); ++i) {
+ if (i == startAfters.size() - 1) {
+ // last startAfter does not need to be resolved, since search for
+ // nextChild will cover that automatically.
+ break;
+ }
+ INode curr =
+ parent.getChild(startAfters.get(i), Snapshot.CURRENT_STATE_ID);
+ if (curr == null) {
+ // inode at this level has changed. Update startAfters to point to
+ // the next dir at the parent level (and dropping any startAfters
+ // at lower levels).
+ for (; i < startAfters.size(); ++i) {
+ startAfters.remove(startAfters.size() - 1);
+ }
+ break;
+ }
+ parent = curr.asDirectory();
+ }
+ return parent;
+ }
+
+ /**
+ * Submit the current batch to the thread pool.
+ *
+ * @param zoneId Id of the EZ INode
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ private void submitCurrentBatch(final long zoneId)
+ throws IOException, InterruptedException {
+ assert dir.hasReadLock();
+ if (currentBatch.isEmpty()) {
+ return;
+ }
+ ZoneSubmissionTracker zst = submissions.get(zoneId);
+ if (zst == null) {
+ zst = new ZoneSubmissionTracker();
+ submissions.put(zoneId, zst);
+ }
+ Future future = batchService
+ .submit(new EDEKReencryptCallable(zoneId, currentBatch, this));
+ zst.addTask(future);
+ LOG.info("Submitted batch (start:{}, size:{}) of zone {} to re-encrypt.",
+ currentBatch.getFirstFilePath(), currentBatch.size(), zoneId);
+ currentBatch = new ReencryptionBatch(reencryptBatchSize);
+ // flip the pause flag if this is nth submission.
+ // The actual pause need to happen outside of the lock.
+ if (pauseAfterNthSubmission > 0) {
+ if (--pauseAfterNthSubmission == 0) {
+ shouldPauseForTesting = true;
+ }
+ }
+ }
+
+ final class ReencryptionBatch {
+ // First file's path, for logging purpose.
+ private String firstFilePath;
+ private final List<FileEdekInfo> batch;
+
+ ReencryptionBatch() {
+ this(reencryptBatchSize);
+ }
+
+ ReencryptionBatch(int initialCapacity) {
+ batch = new ArrayList<>(initialCapacity);
+ }
+
+ void add(final INodeFile inode) throws IOException {
+ assert dir.hasReadLock();
+ Preconditions.checkNotNull(inode, "INodeFile is null");
+ if (batch.isEmpty()) {
+ firstFilePath = inode.getFullPathName();
+ }
+ batch.add(new FileEdekInfo(dir, inode));
+ }
+
+ String getFirstFilePath() {
+ return firstFilePath;
+ }
+
+ boolean isEmpty() {
+ return batch.isEmpty();
+ }
+
+ int size() {
+ return batch.size();
+ }
+
+ void clear() {
+ batch.clear();
+ }
+
+ List<FileEdekInfo> getBatch() {
+ return batch;
+ }
+ }
+
+ /**
+ * Simply contacts the KMS for re-encryption. No NN locks held.
+ */
+ private static class EDEKReencryptCallable
+ implements Callable<ReencryptionTask> {
+ private final long zoneNodeId;
+ private final ReencryptionBatch batch;
+ private final ReencryptionHandler handler;
+
+ EDEKReencryptCallable(final long zoneId,
+ final ReencryptionBatch currentBatch, final ReencryptionHandler rh) {
+ zoneNodeId = zoneId;
+ batch = currentBatch;
+ handler = rh;
+ }
+
+ @Override
+ public ReencryptionTask call() {
+ LOG.info("Processing batched re-encryption for zone {}, batch size {},"
+ + " start:{}", zoneNodeId, batch.size(), batch.getFirstFilePath());
+ if (batch.isEmpty()) {
+ return new ReencryptionTask(zoneNodeId, 0, batch);
+ }
+ final Stopwatch kmsSW = new Stopwatch().start();
+
+ int numFailures = 0;
+ String result = "Completed";
+ if (!reencryptEdeks()) {
+ numFailures += batch.size();
+ result = "Failed to";
+ }
+ LOG.info("{} re-encrypting one batch of {} edeks from KMS,"
+ + " time consumed: {}, start: {}.", result,
+ batch.size(), kmsSW.stop(), batch.getFirstFilePath());
+ return new ReencryptionTask(zoneNodeId, numFailures, batch);
+ }
+
+ private boolean reencryptEdeks() {
+ // communicate with the kms out of lock
+ final List<EncryptedKeyVersion> edeks = new ArrayList<>(batch.size());
+ for (FileEdekInfo entry : batch.getBatch()) {
+ edeks.add(entry.getExistingEdek());
+ }
+ // provider already has LoadBalancingKMSClientProvider's reties. It that
+ // fails, just fail this callable.
+ try {
+ handler.ezManager.getProvider().reencryptEncryptedKeys(edeks);
+ EncryptionFaultInjector.getInstance().reencryptEncryptedKeys();
+ } catch (GeneralSecurityException | IOException ex) {
+ LOG.warn("Failed to re-encrypt one batch of {} edeks, start:{}",
+ batch.size(), batch.getFirstFilePath(), ex);
+ return false;
+ }
+ int i = 0;
+ for (FileEdekInfo entry : batch.getBatch()) {
+ assert i < edeks.size();
+ entry.setEdek(edeks.get(i++));
+ }
+ return true;
+ }
+ }
+
+ /**
+ * Iterates the parent directory, and add direct children files to
+ * current batch. If batch size meets configured threshold, a Callable
+ * is created and sent to the thread pool, which will communicate to the KMS
+ * to get new edeks.
+ * <p>
+ * Locks could be released and reacquired when a Callable is created.
+ *
+ * @param zoneId Id of the EZ INode
+ * @return The inode which was just processed, if lock is held in the entire
+ * process. Null if lock is released.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ private INode reencryptDirInt(final long zoneId, INode curr,
+ List<byte[]> startAfters, final String ezKeyVerName)
+ throws IOException, InterruptedException {
+ assert dir.hasReadLock();
+ assert dir.getFSNamesystem().hasReadLock();
+ Preconditions.checkNotNull(curr, "Current inode can't be null");
+ checkZoneReady(zoneId);
+ final INodeDirectory parent =
+ curr.isDirectory() ? curr.asDirectory() : curr.getParent();
+ ReadOnlyList<INode> children =
+ parent.getChildrenList(Snapshot.CURRENT_STATE_ID);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Re-encrypting directory {}", parent.getFullPathName());
+ }
+
+ final byte[] startAfter = startAfters.get(startAfters.size() - 1);
+ boolean lockReleased = false;
+ for (int i = INodeDirectory.nextChild(children, startAfter);
+ i < children.size(); ++i) {
+ final INode inode = children.get(i);
+ if (!reencryptINode(inode, ezKeyVerName)) {
+ // inode wasn't added for re-encryption. Recurse down if it's a dir,
+ // skip otherwise.
+ if (!inode.isDirectory()) {
+ continue;
+ }
+ if (ezManager.isEncryptionZoneRoot(inode, inode.getFullPathName())) {
+ // nested EZ, ignore.
+ LOG.info("{}({}) is a nested EZ, skipping for re-encryption",
+ inode.getFullPathName(), inode.getId());
+ continue;
+ }
+ // add 1 level to the depth-first search.
+ curr = inode;
+ if (!startAfters.isEmpty()) {
+ startAfters.remove(startAfters.size() - 1);
+ startAfters.add(curr.getLocalNameBytes());
+ }
+ startAfters.add(HdfsFileStatus.EMPTY_NAME);
+ return lockReleased ? null : curr;
+ }
+ if (currentBatch.size() >= reencryptBatchSize) {
+ final byte[] currentStartAfter = inode.getLocalNameBytes();
+ final String parentPath = parent.getFullPathName();
+ submitCurrentBatch(zoneId);
+ lockReleased = true;
+ readUnlock();
+ try {
+ throttle();
+ checkPauseForTesting();
+ } finally {
+ readLock();
+ }
+ checkZoneReady(zoneId);
+
+ // Things could have changed when the lock was released.
+ // Re-resolve the parent inode.
+ FSPermissionChecker pc = dir.getPermissionChecker();
+ INode newParent =
+ dir.resolvePath(pc, parentPath, FSDirectory.DirOp.READ)
+ .getLastINode();
+ if (newParent == null || !newParent.equals(parent)) {
+ // parent dir is deleted or recreated. We're done.
+ return null;
+ }
+ children = parent.getChildrenList(Snapshot.CURRENT_STATE_ID);
+ // -1 to counter the ++ on the for loop
+ i = INodeDirectory.nextChild(children, currentStartAfter) - 1;
+ }
+ }
+ // Successfully finished this dir, adjust pointers to 1 level up, and
+ // startAfter this dir.
+ startAfters.remove(startAfters.size() - 1);
+ if (!startAfters.isEmpty()) {
+ startAfters.remove(startAfters.size() - 1);
+ startAfters.add(curr.getLocalNameBytes());
+ }
+ curr = curr.getParent();
+ return lockReleased ? null : curr;
+ }
+
+ private void readLock() {
+ dir.getFSNamesystem().readLock();
+ dir.readLock();
+ throttleTimerLocked.start();
+ }
+
+ private void readUnlock() {
+ dir.readUnlock();
+ dir.getFSNamesystem().readUnlock("reencryptHandler");
+ throttleTimerLocked.stop();
+ }
+
+ /**
+ * Throttles the ReencryptionHandler in 3 aspects:
+ * 1. Prevents generating more Callables than the CPU could possibly handle.
+ * 2. Prevents generating more Callables than the ReencryptionUpdater can
+ * handle, under its own throttling
+ * 3. Prevents contending FSN/FSD read locks. This is done based on the
+ * DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_RATIO_KEY configuration.
+ * <p>
+ * Item 1 and 2 are to control NN heap usage.
+ *
+ * @throws InterruptedException
+ */
+ @VisibleForTesting
+ void throttle() throws InterruptedException {
+ // 1.
+ final int numCores = Runtime.getRuntime().availableProcessors();
+ if (taskQueue.size() >= numCores) {
+ LOG.debug("Re-encryption handler throttling because queue size {} is"
+ + "larger than number of cores {}", taskQueue.size(), numCores);
+ while (taskQueue.size() >= numCores) {
+ Thread.sleep(100);
+ }
+ }
+
+ // 2. if tasks are piling up on the updater, don't create new callables
+ // until the queue size goes down.
+ final int maxTasksPiled = Runtime.getRuntime().availableProcessors() * 2;
+ int totalTasks = 0;
+ for (ZoneSubmissionTracker zst : submissions.values()) {
+ totalTasks += zst.getTasks().size();
+ }
+ if (totalTasks >= maxTasksPiled) {
+ LOG.debug("Re-encryption handler throttling because total tasks pending"
+ + " re-encryption updater is {}", totalTasks);
+ while (totalTasks >= maxTasksPiled) {
+ Thread.sleep(500);
+ totalTasks = 0;
+ for (ZoneSubmissionTracker zst : submissions.values()) {
+ totalTasks += zst.getTasks().size();
+ }
+ }
+ }
+
+ // 3.
+ if (throttleLimitHandlerRatio >= 1.0) {
+ return;
+ }
+ final long expect = (long) (throttleTimerAll.now(TimeUnit.MILLISECONDS)
+ * throttleLimitHandlerRatio);
+ final long actual = throttleTimerLocked.now(TimeUnit.MILLISECONDS);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Re-encryption handler throttling expect: {}, actual: {},"
+ + " throttleTimerAll:{}", expect, actual,
+ throttleTimerAll.now(TimeUnit.MILLISECONDS));
+ }
+ if (expect - actual < 0) {
+ // in case throttleLimitHandlerRatio is very small, expect will be 0.
+ // so sleepMs should not be calculated from expect, to really meet the
+ // ratio. e.g. if ratio is 0.001, expect = 0 and actual = 1, sleepMs
+ // should be 1000 - throttleTimerAll.now()
+ final long sleepMs =
+ (long) (actual / throttleLimitHandlerRatio) - throttleTimerAll
+ .now(TimeUnit.MILLISECONDS);
+ LOG.debug("Throttling re-encryption, sleeping for {} ms", sleepMs);
+ Thread.sleep(sleepMs);
+ }
+ throttleTimerAll.reset().start();
+ throttleTimerLocked.reset();
+ }
+
+ /**
+ * Process an Inode for re-encryption. Add to current batch if it's a file,
+ * no-op otherwise.
+ *
+ * @param inode the inode
+ * @return true if inode is added to currentBatch and should be re-encrypted.
+ * false otherwise: could be inode is not a file, or inode's edek's
+ * key version is not changed.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ private boolean reencryptINode(final INode inode, final String ezKeyVerName)
+ throws IOException, InterruptedException {
+ dir.hasReadLock();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processing {} for re-encryption", inode.getFullPathName());
+ }
+ if (!inode.isFile()) {
+ return false;
+ }
+ FileEncryptionInfo feInfo = FSDirEncryptionZoneOp
+ .getFileEncryptionInfo(dir, INodesInPath.fromINode(inode));
+ if (feInfo == null) {
+ LOG.warn("File {} skipped re-encryption because it is not encrypted! "
+ + "This is very likely a bug.", inode.getId());
+ return false;
+ }
+ if (ezKeyVerName.equals(feInfo.getEzKeyVersionName())) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("File {} skipped re-encryption because edek's key version"
+ + " name is not changed.", inode.getFullPathName());
+ }
+ return false;
+ }
+ currentBatch.add(inode.asFile());
+ return true;
+ }
+
+ /**
+ * Check whether zone is ready for re-encryption. Throws IOE if it's not.
+ * 1. If EZ is deleted.
+ * 2. if the re-encryption is canceled.
+ * 3. If NN is not active or is in safe mode.
+ *
+ * @throws IOException if zone does not exist / is cancelled, or if NN is not
+ * ready for write.
+ */
+ void checkZoneReady(final long zoneId)
+ throws RetriableException, SafeModeException, IOException {
+ final ZoneReencryptionStatus zs =
+ getReencryptionStatus().getZoneStatus(zoneId);
+ if (zs == null) {
+ throw new IOException("Zone " + zoneId + " status cannot be found.");
+ }
+ if (zs.isCanceled()) {
+ throw new IOException("Re-encryption is canceled for zone " + zoneId);
+ }
+ dir.getFSNamesystem()
+ .checkNameNodeSafeMode("NN is in safe mode, cannot re-encrypt.");
+ // re-encryption should be cancelled when NN goes to standby. Just
+ // double checking for sanity.
+ dir.getFSNamesystem().checkOperation(NameNode.OperationCategory.WRITE);
+ }
+
+ /**
+ * Called when a new zone is submitted for re-encryption. This will interrupt
+ * the background thread if it's waiting for the next
+ * DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_KEY.
+ */
+ synchronized void notifyNewSubmission() {
+ LOG.debug("Notifying handler for new re-encryption command.");
+ this.notify();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java
new file mode 100644
index 0000000..690a0e9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java
@@ -0,0 +1,523 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.fs.XAttr;
+import org.apache.hadoop.fs.XAttrSetFlag;
+import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
+import org.apache.hadoop.hdfs.server.namenode.ReencryptionHandler.ReencryptionBatch;
+import org.apache.hadoop.ipc.RetriableException;
+import org.apache.hadoop.util.StopWatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_UPDATER_RATIO_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_UPDATER_RATIO_KEY;
+
+/**
+ * Class for finalizing re-encrypt EDEK operations, by updating file xattrs with
+ * edeks returned from reencryption.
+ * <p>
+ * The tasks are submitted by ReencryptionHandler.
+ * <p>
+ * It is assumed only 1 Updater will be running, since updating file xattrs
+ * requires namespace write lock, and performance gain from multi-threading
+ * is limited.
+ */
+@InterfaceAudience.Private
+public final class ReencryptionUpdater implements Runnable {
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(ReencryptionUpdater.class);
+
+ private volatile boolean shouldPauseForTesting = false;
+ private volatile int pauseAfterNthCheckpoint = 0;
+ private volatile long pauseZoneId = 0;
+
+ private double throttleLimitRatio;
+ private final StopWatch throttleTimerAll = new StopWatch();
+ private final StopWatch throttleTimerLocked = new StopWatch();
+
+ private volatile long faultRetryInterval = 60000;
+
+ /**
+ * Class to track re-encryption submissions of a single zone. It contains
+ * all the submitted futures, and statistics about how far the futures are
+ * processed.
+ */
+ static final class ZoneSubmissionTracker {
+ private boolean submissionDone;
+ private LinkedList<Future> tasks;
+ private int numCheckpointed;
+ private int numFutureDone;
+
+ ZoneSubmissionTracker() {
+ submissionDone = false;
+ tasks = new LinkedList<>();
+ numCheckpointed = 0;
+ numFutureDone = 0;
+ }
+
+ LinkedList<Future> getTasks() {
+ return tasks;
+ }
+
+ void cancelAllTasks() {
+ if (!tasks.isEmpty()) {
+ LOG.info("Cancelling {} re-encryption tasks", tasks.size());
+ for (Future f : tasks) {
+ f.cancel(true);
+ }
+ }
+ }
+
+ void addTask(final Future task) {
+ tasks.add(task);
+ }
+
+ private boolean isCompleted() {
+ return submissionDone && tasks.isEmpty();
+ }
+
+ void setSubmissionDone() {
+ submissionDone = true;
+ }
+ }
+
+ /**
+ * Class representing the task for one batch of a re-encryption command. It
+ * also contains statistics about how far this single batch has been executed.
+ */
+ static final class ReencryptionTask {
+ private final long zoneId;
+ private boolean processed = false;
+ private int numFilesUpdated = 0;
+ private int numFailures = 0;
+ private String lastFile = null;
+ private final ReencryptionBatch batch;
+
+ ReencryptionTask(final long id, final int failures,
+ final ReencryptionBatch theBatch) {
+ zoneId = id;
+ numFailures = failures;
+ batch = theBatch;
+ }
+ }
+
+ /**
+ * Class that encapsulates re-encryption details of a file. It contains the
+ * file inode, stores the initial edek of the file, and the new edek
+ * after re-encryption.
+ * <p>
+ * Assumptions are the object initialization happens when dir lock is held,
+ * and inode is valid and is encrypted during initialization.
+ * <p>
+ * Namespace changes may happen during re-encryption, and if inode is changed
+ * the re-encryption is skipped.
+ */
+ static final class FileEdekInfo {
+ private final long inodeId;
+ private final EncryptedKeyVersion existingEdek;
+ private EncryptedKeyVersion edek = null;
+
+ FileEdekInfo(FSDirectory dir, INodeFile inode) throws IOException {
+ assert dir.hasReadLock();
+ Preconditions.checkNotNull(inode, "INodeFile is null");
+ inodeId = inode.getId();
+ final FileEncryptionInfo fei = FSDirEncryptionZoneOp
+ .getFileEncryptionInfo(dir, INodesInPath.fromINode(inode));
+ Preconditions.checkNotNull(fei,
+ "FileEncryptionInfo is null for " + inodeId);
+ existingEdek = EncryptedKeyVersion
+ .createForDecryption(fei.getKeyName(), fei.getEzKeyVersionName(),
+ fei.getIV(), fei.getEncryptedDataEncryptionKey());
+ }
+
+ long getInodeId() {
+ return inodeId;
+ }
+
+ EncryptedKeyVersion getExistingEdek() {
+ return existingEdek;
+ }
+
+ void setEdek(final EncryptedKeyVersion ekv) {
+ assert ekv != null;
+ edek = ekv;
+ }
+ }
+
+ @VisibleForTesting
+ synchronized void pauseForTesting() {
+ shouldPauseForTesting = true;
+ LOG.info("Pausing re-encrypt updater for testing.");
+ notify();
+ }
+
+ @VisibleForTesting
+ synchronized void resumeForTesting() {
+ shouldPauseForTesting = false;
+ LOG.info("Resuming re-encrypt updater for testing.");
+ notify();
+ }
+
+ @VisibleForTesting
+ void pauseForTestingAfterNthCheckpoint(final long zoneId, final int count) {
+ assert pauseAfterNthCheckpoint == 0;
+ pauseAfterNthCheckpoint = count;
+ pauseZoneId = zoneId;
+ }
+
+ private final FSDirectory dir;
+ private final CompletionService<ReencryptionTask> batchService;
+ private final ReencryptionHandler handler;
+
+ ReencryptionUpdater(final FSDirectory fsd,
+ final CompletionService<ReencryptionTask> service,
+ final ReencryptionHandler rh, final Configuration conf) {
+ dir = fsd;
+ batchService = service;
+ handler = rh;
+ this.throttleLimitRatio =
+ conf.getDouble(DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_UPDATER_RATIO_KEY,
+ DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_UPDATER_RATIO_DEFAULT);
+ Preconditions.checkArgument(throttleLimitRatio > 0.0f,
+ DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_UPDATER_RATIO_KEY
+ + " is not positive.");
+ }
+
+ /**
+ * Called by the submission thread to indicate all tasks have been submitted.
+ * If this is called but no tasks has been submitted, the re-encryption is
+ * considered complete.
+ *
+ * @param zoneId Id of the zone inode.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ void markZoneSubmissionDone(final long zoneId)
+ throws IOException, InterruptedException {
+ final ZoneSubmissionTracker tracker = handler.getTracker(zoneId);
+ if (tracker != null) {
+ tracker.submissionDone = true;
+ } else {
+ // Caller thinks submission is done, but no tasks submitted - meaning
+ // no files in the EZ need to be re-encrypted. Complete directly.
+ handler.addDummyTracker(zoneId);
+ }
+ }
+
+ @Override
+ public void run() {
+ throttleTimerAll.start();
+ while (true) {
+ try {
+ // Assuming single-threaded updater.
+ takeAndProcessTasks();
+ } catch (InterruptedException ie) {
+ LOG.warn("Re-encryption updater thread interrupted. Exiting.");
+ Thread.currentThread().interrupt();
+ return;
+ } catch (IOException ioe) {
+ LOG.warn("Re-encryption updater thread exception.", ioe);
+ } catch (Throwable t) {
+ LOG.error("Re-encryption updater thread exiting.", t);
+ return;
+ }
+ }
+ }
+
+ /**
+ * Process a completed ReencryptionTask. Each inode id is resolved to an INode
+ * object, skip if the inode is deleted.
+ * <p>
+ * Only file xattr is updated by this method. Re-encryption progress is not
+ * updated.
+ *
+ * @param zoneNodePath full path of the EZ inode.
+ * @param task the completed task.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ private void processTaskEntries(final String zoneNodePath,
+ final ReencryptionTask task) throws IOException, InterruptedException {
+ assert dir.hasWriteLock();
+ if (!task.batch.isEmpty() && task.numFailures == 0) {
+ LOG.debug(
+ "Updating file xattrs for re-encrypting zone {}," + " starting at {}",
+ zoneNodePath, task.batch.getFirstFilePath());
+ for (Iterator<FileEdekInfo> it = task.batch.getBatch().iterator();
+ it.hasNext();) {
+ FileEdekInfo entry = it.next();
+ // resolve the inode again, and skip if it's doesn't exist
+ LOG.trace("Updating {} for re-encryption.", entry.getInodeId());
+ final INode inode = dir.getInode(entry.getInodeId());
+ if (inode == null) {
+ LOG.debug("INode {} doesn't exist, skipping re-encrypt.",
+ entry.getInodeId());
+ // also remove from batch so later it's not saved.
+ it.remove();
+ continue;
+ }
+
+ // Cautiously check file encryption info, and only update if we're sure
+ // it's still using the same edek.
+ Preconditions.checkNotNull(entry.edek);
+ final FileEncryptionInfo fei = FSDirEncryptionZoneOp
+ .getFileEncryptionInfo(dir, INodesInPath.fromINode(inode));
+ if (!fei.getKeyName().equals(entry.edek.getEncryptionKeyName())) {
+ LOG.debug("Inode {} EZ key changed, skipping re-encryption.",
+ entry.getInodeId());
+ it.remove();
+ continue;
+ }
+ if (fei.getEzKeyVersionName()
+ .equals(entry.edek.getEncryptionKeyVersionName())) {
+ LOG.debug(
+ "Inode {} EZ key version unchanged, skipping re-encryption.",
+ entry.getInodeId());
+ it.remove();
+ continue;
+ }
+ if (!Arrays.equals(fei.getEncryptedDataEncryptionKey(),
+ entry.existingEdek.getEncryptedKeyVersion().getMaterial())) {
+ LOG.debug("Inode {} existing edek changed, skipping re-encryption",
+ entry.getInodeId());
+ it.remove();
+ continue;
+ }
+ FileEncryptionInfo newFei = new FileEncryptionInfo(fei.getCipherSuite(),
+ fei.getCryptoProtocolVersion(),
+ entry.edek.getEncryptedKeyVersion().getMaterial(),
+ entry.edek.getEncryptedKeyIv(), fei.getKeyName(),
+ entry.edek.getEncryptionKeyVersionName());
+ final INodesInPath iip = INodesInPath.fromINode(inode);
+ FSDirEncryptionZoneOp
+ .setFileEncryptionInfo(dir, iip, newFei, XAttrSetFlag.REPLACE);
+ task.lastFile = iip.getPath();
+ ++task.numFilesUpdated;
+ }
+
+ LOG.info("Updated xattrs on {}({}) files in zone {} for re-encryption,"
+ + " starting:{}.", task.numFilesUpdated, task.batch.size(),
+ zoneNodePath, task.batch.getFirstFilePath());
+ }
+ task.processed = true;
+ }
+
+ /**
+ * Iterate tasks for the given zone, and update progress accordingly. The
+ * checkpoint indicates all files before it are done re-encryption, so it will
+ * be updated to the position where all tasks before are completed.
+ *
+ * @param zoneNode the EZ inode.
+ * @param tracker the zone submission tracker.
+ * @return the list containing the last checkpointed xattr. Empty if
+ * no checkpoint happened.
+ * @throws ExecutionException
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ private List<XAttr> processCheckpoints(final INode zoneNode,
+ final ZoneSubmissionTracker tracker)
+ throws ExecutionException, IOException, InterruptedException {
+ assert dir.hasWriteLock();
+ final long zoneId = zoneNode.getId();
+ final String zonePath = zoneNode.getFullPathName();
+ final ZoneReencryptionStatus status =
+ handler.getReencryptionStatus().getZoneStatus(zoneId);
+ assert status != null;
+ // always start from the beginning, because the checkpoint means all files
+ // before it are re-encrypted.
+ final LinkedList<Future> tasks = tracker.getTasks();
+ final List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
+ ListIterator<Future> iter = tasks.listIterator();
+ while (iter.hasNext()) {
+ Future<ReencryptionTask> curr = iter.next();
+ if (!curr.isDone() || !curr.get().processed) {
+ // still has earlier tasks not completed, skip here.
+ break;
+ }
+ ReencryptionTask task = curr.get();
+ LOG.debug("Updating re-encryption checkpoint with completed task."
+ + " last: {} size:{}.", task.lastFile, task.batch.size());
+ assert zoneId == task.zoneId;
+ try {
+ final XAttr xattr = FSDirEncryptionZoneOp
+ .updateReencryptionProgress(dir, zoneNode, status, task.lastFile,
+ task.numFilesUpdated, task.numFailures);
+ xAttrs.clear();
+ xAttrs.add(xattr);
+ } catch (IOException ie) {
+ LOG.warn("Failed to update re-encrypted progress to xattr for zone {}",
+ zonePath, ie);
+ ++task.numFailures;
+ }
+ ++tracker.numCheckpointed;
+ iter.remove();
+ }
+ if (tracker.isCompleted()) {
+ LOG.debug("Removed re-encryption tracker for zone {} because it completed"
+ + " with {} tasks.", zonePath, tracker.numCheckpointed);
+ return handler.completeReencryption(zoneNode);
+ }
+ return xAttrs;
+ }
+
+ private void takeAndProcessTasks() throws Exception {
+ final Future<ReencryptionTask> completed = batchService.take();
+ throttle();
+ checkPauseForTesting();
+ ReencryptionTask task = completed.get();
+ if (completed.isCancelled()) {
+ LOG.debug("Skipped canceled re-encryption task for zone {}, last: {}",
+ task.zoneId, task.lastFile);
+ }
+
+ boolean shouldRetry;
+ do {
+ dir.getFSNamesystem().writeLock();
+ try {
+ throttleTimerLocked.start();
+ processTask(task);
+ shouldRetry = false;
+ } catch (RetriableException | SafeModeException re) {
+ // Keep retrying until succeed.
+ LOG.info("Exception when processing re-encryption task for zone {}, "
+ + "retrying...", task.zoneId, re);
+ shouldRetry = true;
+ Thread.sleep(faultRetryInterval);
+ } catch (IOException ioe) {
+ LOG.warn("Failure processing re-encryption task for zone {}",
+ task.zoneId, ioe);
+ ++task.numFailures;
+ task.processed = true;
+ shouldRetry = false;
+ } finally {
+ dir.getFSNamesystem().writeUnlock("reencryptUpdater");
+ throttleTimerLocked.stop();
+ }
+ // logSync regardless, to prevent edit log buffer overflow triggering
+ // logSync inside FSN writelock.
+ dir.getEditLog().logSync();
+ } while (shouldRetry);
+ }
+
+ private void processTask(ReencryptionTask task)
+ throws InterruptedException, ExecutionException, IOException {
+ final List<XAttr> xAttrs;
+ final String zonePath;
+ dir.writeLock();
+ try {
+ handler.checkZoneReady(task.zoneId);
+ final INode zoneNode = dir.getInode(task.zoneId);
+ if (zoneNode == null) {
+ // ez removed.
+ return;
+ }
+ zonePath = zoneNode.getFullPathName();
+ LOG.info("Processing returned re-encryption task for zone {}({}), "
+ + "batch size {}, start:{}", zonePath, task.zoneId,
+ task.batch.size(), task.batch.getFirstFilePath());
+ final ZoneSubmissionTracker tracker =
+ handler.getTracker(zoneNode.getId());
+ Preconditions.checkNotNull(tracker, "zone tracker not found " + zonePath);
+ tracker.numFutureDone++;
+ EncryptionFaultInjector.getInstance().reencryptUpdaterProcessOneTask();
+ processTaskEntries(zonePath, task);
+ EncryptionFaultInjector.getInstance().reencryptUpdaterProcessCheckpoint();
+ xAttrs = processCheckpoints(zoneNode, tracker);
+ } finally {
+ dir.writeUnlock();
+ }
+ FSDirEncryptionZoneOp.saveFileXAttrsForBatch(dir, task.batch.getBatch());
+ if (!xAttrs.isEmpty()) {
+ dir.getEditLog().logSetXAttrs(zonePath, xAttrs, false);
+ }
+ }
+
+ private synchronized void checkPauseForTesting() throws InterruptedException {
+ assert !dir.hasWriteLock();
+ assert !dir.getFSNamesystem().hasWriteLock();
+ if (pauseAfterNthCheckpoint != 0) {
+ ZoneSubmissionTracker tracker =
+ handler.unprotectedGetTracker(pauseZoneId);
+ if (tracker != null) {
+ if (tracker.numFutureDone == pauseAfterNthCheckpoint) {
+ shouldPauseForTesting = true;
+ pauseAfterNthCheckpoint = 0;
+ }
+ }
+ }
+ while (shouldPauseForTesting) {
+ LOG.info("Sleeping in the re-encryption updater for unit test.");
+ wait();
+ LOG.info("Continuing re-encryption updater after pausing.");
+ }
+ }
+
+ /**
+ * Throttles the ReencryptionUpdater to prevent from contending FSN/FSD write
+ * locks. This is done by the configuration.
+ */
+ private void throttle() throws InterruptedException {
+ if (throttleLimitRatio >= 1.0) {
+ return;
+ }
+
+ final long expect = (long) (throttleTimerAll.now(TimeUnit.MILLISECONDS)
+ * throttleLimitRatio);
+ final long actual = throttleTimerLocked.now(TimeUnit.MILLISECONDS);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Re-encryption updater throttling expect: {}, actual: {},"
+ + " throttleTimerAll:{}", expect, actual,
+ throttleTimerAll.now(TimeUnit.MILLISECONDS));
+ }
+ if (expect - actual < 0) {
+ // in case throttleLimitHandlerRatio is very small, expect will be 0.
+ // so sleepMs should not be calculated from expect, to really meet the
+ // ratio. e.g. if ratio is 0.001, expect = 0 and actual = 1, sleepMs
+ // should be 1000 - throttleTimerAll.now()
+ final long sleepMs =
+ (long) (actual / throttleLimitRatio) - throttleTimerAll
+ .now(TimeUnit.MILLISECONDS);
+ LOG.debug("Throttling re-encryption, sleeping for {} ms", sleepMs);
+ Thread.sleep(sleepMs);
+ }
+ throttleTimerAll.reset().start();
+ throttleTimerLocked.reset();
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org