You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wh...@apache.org on 2015/09/24 17:46:48 UTC

hadoop git commit: HDFS-7529. Consolidate encryption zone related implementation into a single class. Contributed by Rakesh R.

Repository: hadoop
Updated Branches:
  refs/heads/branch-2 e5ba1a052 -> 13bdc20d2


HDFS-7529. Consolidate encryption zone related implementation into a single class. Contributed by Rakesh R.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/13bdc20d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/13bdc20d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/13bdc20d

Branch: refs/heads/branch-2
Commit: 13bdc20d2bd6127d9d64883f301122a812c12be8
Parents: e5ba1a0
Author: Haohui Mai <wh...@apache.org>
Authored: Thu Sep 24 08:34:32 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Thu Sep 24 08:46:44 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   5 +-
 .../hdfs/server/namenode/FSDirConcatOp.java     |   2 +-
 .../server/namenode/FSDirEncryptionZoneOp.java  | 303 +++++++++++++++++++
 .../server/namenode/FSDirStatAndListingOp.java  |  27 +-
 .../hdfs/server/namenode/FSDirWriteFileOp.java  |   6 +-
 .../hdfs/server/namenode/FSDirectory.java       | 147 +--------
 .../hdfs/server/namenode/FSNamesystem.java      | 128 ++------
 7 files changed, 361 insertions(+), 257 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/13bdc20d/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 580629b..354c3cc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -607,7 +607,10 @@ Release 2.8.0 - UNRELEASED
 
     HDFS-9131 Move config keys used by hdfs-client to HdfsClientConfigKeys.
     (Mingliang Liu via wheat9)
-    
+  
+    HDFS-7529. Consolidate encryption zone related implementation into a single
+    class. (Rakesh R via wheat9)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13bdc20d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
index e382f35..492994e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
@@ -89,7 +89,7 @@ class FSDirConcatOp {
   private static void verifyTargetFile(FSDirectory fsd, final String target,
       final INodesInPath targetIIP) throws IOException {
     // check the target
-    if (fsd.getEZForPath(targetIIP) != null) {
+    if (FSDirEncryptionZoneOp.getEZForPath(fsd, targetIIP) != null) {
       throw new HadoopIllegalArgumentException(
           "concat can not be called for files in an encryption zone.");
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13bdc20d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java
new file mode 100644
index 0000000..0f0b629
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_FILE_ENCRYPTION_INFO;
+
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.util.AbstractMap;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.crypto.CipherSuite;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
+import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.XAttr;
+import org.apache.hadoop.fs.XAttrSetFlag;
+import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.hdfs.XAttrHelper;
+import org.apache.hadoop.hdfs.protocol.EncryptionZone;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * Helper class to perform encryption zone operation.
+ */
+final class FSDirEncryptionZoneOp {
+
+  /**
+   * Private constructor for preventing FSDirEncryptionZoneOp object creation.
+   * Static-only class.
+   */
+  private FSDirEncryptionZoneOp() {}
+
+  /**
+   * Invoke KeyProvider APIs to generate an encrypted data encryption key for
+   * an encryption zone. Should not be called with any locks held.
+   *
+   * @param fsd fsdirectory
+   * @param ezKeyName key name of an encryption zone
+   * @return New EDEK, or null if ezKeyName is null
+   * @throws IOException
+   */
+  static EncryptedKeyVersion generateEncryptedDataEncryptionKey(
+      final FSDirectory fsd, final String ezKeyName) throws IOException {
+    if (ezKeyName == null) {
+      return null;
+    }
+    EncryptedKeyVersion edek = null;
+    try {
+      edek = fsd.getProvider().generateEncryptedKey(ezKeyName);
+    } catch (GeneralSecurityException e) {
+      throw new IOException(e);
+    }
+    Preconditions.checkNotNull(edek);
+    return edek;
+  }
+
+  static KeyProvider.Metadata ensureKeyIsInitialized(final FSDirectory fsd,
+      final String keyName, final String src) throws IOException {
+    KeyProviderCryptoExtension provider = fsd.getProvider();
+    if (provider == null) {
+      throw new IOException("Can't create an encryption zone for " + src
+          + " since no key provider is available.");
+    }
+    if (keyName == null || keyName.isEmpty()) {
+      throw new IOException("Must specify a key name when creating an "
+          + "encryption zone");
+    }
+    KeyProvider.Metadata metadata = provider.getMetadata(keyName);
+    if (metadata == null) {
+      /*
+       * It would be nice if we threw something more specific than
+       * IOException when the key is not found, but the KeyProvider API
+       * doesn't provide for that. If that API is ever changed to throw
+       * something more specific (e.g. UnknownKeyException) then we can
+       * update this to match it, or better yet, just rethrow the
+       * KeyProvider's exception.
+       */
+      throw new IOException("Key " + keyName + " doesn't exist.");
+    }
+    // If the provider supports pool for EDEKs, this will fill in the pool
+    provider.warmUpEncryptedKeys(keyName);
+    return metadata;
+  }
+
+  /**
+   * Create an encryption zone on directory path using the specified key.
+   *
+   * @param fsd fsdirectory
+   * @param srcArg the path of a directory which will be the root of the
+   *               encryption zone. The directory must be empty
+   * @param pc permission checker to check fs permission
+   * @param cipher cipher
+   * @param keyName name of a key which must be present in the configured
+   *                KeyProvider
+   * @param logRetryCache whether to record RPC ids in editlog for retry cache
+   *                      rebuilding
+   * @return HdfsFileStatus
+   * @throws IOException
+   */
+  static HdfsFileStatus createEncryptionZone(final FSDirectory fsd,
+      final String srcArg, final FSPermissionChecker pc, final String cipher,
+      final String keyName, final boolean logRetryCache) throws IOException {
+    final byte[][] pathComponents = FSDirectory
+        .getPathComponentsForReservedPath(srcArg);
+    final CipherSuite suite = CipherSuite.convert(cipher);
+    List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
+    final String src;
+    // For now this is hard coded, as we only support one method.
+    final CryptoProtocolVersion version =
+        CryptoProtocolVersion.ENCRYPTION_ZONES;
+
+    fsd.writeLock();
+    try {
+      src = fsd.resolvePath(pc, srcArg, pathComponents);
+      final XAttr ezXAttr = fsd.ezManager.createEncryptionZone(src, suite,
+          version, keyName);
+      xAttrs.add(ezXAttr);
+    } finally {
+      fsd.writeUnlock();
+    }
+    fsd.getEditLog().logSetXAttrs(src, xAttrs, logRetryCache);
+    final INodesInPath iip = fsd.getINodesInPath4Write(src, false);
+    return fsd.getAuditFileInfo(iip);
+  }
+
+  /**
+   * Get the encryption zone for the specified path.
+   *
+   * @param fsd fsdirectory
+   * @param srcArg the path of a file or directory to get the EZ for
+   * @param pc permission checker to check fs permission
+   * @return the EZ with file status.
+   */
+  static Map.Entry<EncryptionZone, HdfsFileStatus> getEZForPath(
+      final FSDirectory fsd, final String srcArg, final FSPermissionChecker pc)
+      throws IOException {
+    final byte[][] pathComponents = FSDirectory
+        .getPathComponentsForReservedPath(srcArg);
+    final String src;
+    final INodesInPath iip;
+    final EncryptionZone ret;
+    fsd.readLock();
+    try {
+      src = fsd.resolvePath(pc, srcArg, pathComponents);
+      iip = fsd.getINodesInPath(src, true);
+      if (fsd.isPermissionEnabled()) {
+        fsd.checkPathAccess(pc, iip, FsAction.READ);
+      }
+      ret = fsd.ezManager.getEZINodeForPath(iip);
+    } finally {
+      fsd.readUnlock();
+    }
+    HdfsFileStatus auditStat = fsd.getAuditFileInfo(iip);
+    return new AbstractMap.SimpleImmutableEntry<>(ret, auditStat);
+  }
+
+  static EncryptionZone getEZForPath(final FSDirectory fsd,
+      final INodesInPath iip) {
+    fsd.readLock();
+    try {
+      return fsd.ezManager.getEZINodeForPath(iip);
+    } finally {
+      fsd.readUnlock();
+    }
+  }
+
+  static BatchedListEntries<EncryptionZone> listEncryptionZones(
+      final FSDirectory fsd, final long prevId) throws IOException {
+    fsd.readLock();
+    try {
+      return fsd.ezManager.listEncryptionZones(prevId);
+    } finally {
+      fsd.readUnlock();
+    }
+  }
+
+  /**
+   * Set the FileEncryptionInfo for an INode.
+   *
+   * @param fsd fsdirectory
+   * @param src the path of a directory which will be the root of the
+   *            encryption zone.
+   * @param info file encryption information
+   * @throws IOException
+   */
+  static void setFileEncryptionInfo(final FSDirectory fsd, final String src,
+      final FileEncryptionInfo info) throws IOException {
+    // Make the PB for the xattr
+    final HdfsProtos.PerFileEncryptionInfoProto proto =
+        PBHelperClient.convertPerFileEncInfo(info);
+    final byte[] protoBytes = proto.toByteArray();
+    final XAttr fileEncryptionAttr =
+        XAttrHelper.buildXAttr(CRYPTO_XATTR_FILE_ENCRYPTION_INFO, protoBytes);
+    final List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
+    xAttrs.add(fileEncryptionAttr);
+    fsd.writeLock();
+    try {
+      FSDirXAttrOp.unprotectedSetXAttrs(fsd, src, xAttrs,
+                                        EnumSet.of(XAttrSetFlag.CREATE));
+    } finally {
+      fsd.writeUnlock();
+    }
+  }
+
+  /**
+   * This function combines the per-file encryption info (obtained
+   * from the inode's XAttrs), and the encryption info from its zone, and
+   * returns a consolidated FileEncryptionInfo instance. Null is returned
+   * for non-encrypted files.
+   *
+   * @param fsd fsdirectory
+   * @param inode inode of the file
+   * @param snapshotId ID of the snapshot that
+   *                   we want to get encryption info from
+   * @param iip inodes in the path containing the file, passed in to
+   *            avoid obtaining the list of inodes again; if iip is
+   *            null then the list of inodes will be obtained again
+   * @return consolidated file encryption info; null for non-encrypted files
+   */
+  static FileEncryptionInfo getFileEncryptionInfo(final FSDirectory fsd,
+      final INode inode, final int snapshotId, final INodesInPath iip)
+      throws IOException {
+    if (!inode.isFile()) {
+      return null;
+    }
+    fsd.readLock();
+    try {
+      EncryptionZone encryptionZone = getEZForPath(fsd, iip);
+      if (encryptionZone == null) {
+        // not an encrypted file
+        return null;
+      } else if(encryptionZone.getPath() == null
+          || encryptionZone.getPath().isEmpty()) {
+        if (NameNode.LOG.isDebugEnabled()) {
+          NameNode.LOG.debug("Encryption zone " +
+              encryptionZone.getPath() + " does not have a valid path.");
+        }
+      }
+
+      final CryptoProtocolVersion version = encryptionZone.getVersion();
+      final CipherSuite suite = encryptionZone.getSuite();
+      final String keyName = encryptionZone.getKeyName();
+      XAttr fileXAttr = FSDirXAttrOp.unprotectedGetXAttrByPrefixedName(inode,
+          snapshotId, CRYPTO_XATTR_FILE_ENCRYPTION_INFO);
+
+      if (fileXAttr == null) {
+        NameNode.LOG.warn("Could not find encryption XAttr for file " +
+            iip.getPath() + " in encryption zone " + encryptionZone.getPath());
+        return null;
+      }
+      try {
+        HdfsProtos.PerFileEncryptionInfoProto fileProto =
+            HdfsProtos.PerFileEncryptionInfoProto.parseFrom(
+                fileXAttr.getValue());
+        return PBHelperClient.convert(fileProto, suite, version, keyName);
+      } catch (InvalidProtocolBufferException e) {
+        throw new IOException("Could not parse file encryption info for " +
+            "inode " + inode, e);
+      }
+    } finally {
+      fsd.readUnlock();
+    }
+  }
+
+  static boolean isInAnEZ(final FSDirectory fsd, final INodesInPath iip)
+      throws UnresolvedLinkException, SnapshotAccessControlException {
+    fsd.readLock();
+    try {
+      return fsd.ezManager.isInAnEZ(iip);
+    } finally {
+      fsd.readUnlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13bdc20d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
index f737cc3..98af592 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
@@ -178,7 +178,8 @@ class FSDirStatAndListingOp {
       }
 
       final FileEncryptionInfo feInfo = isReservedName ? null
-          : fsd.getFileEncryptionInfo(inode, iip.getPathSnapshotId(), iip);
+          : FSDirEncryptionZoneOp.getFileEncryptionInfo(fsd, inode,
+              iip.getPathSnapshotId(), iip);
 
       final LocatedBlocks blocks = bm.createLocatedBlocks(
           inode.getBlocks(iip.getPathSnapshotId()), fileSize, isUc, offset,
@@ -439,18 +440,20 @@ class FSDirStatAndListingOp {
     long blocksize = 0;
     final boolean isEncrypted;
 
-    final FileEncryptionInfo feInfo = isRawPath ? null :
-        fsd.getFileEncryptionInfo(node, snapshot, iip);
+    final FileEncryptionInfo feInfo = isRawPath ? null : FSDirEncryptionZoneOp
+        .getFileEncryptionInfo(fsd, node, snapshot, iip);
 
     if (node.isFile()) {
       final INodeFile fileNode = node.asFile();
       size = fileNode.computeFileSize(snapshot);
       replication = fileNode.getFileReplication(snapshot);
       blocksize = fileNode.getPreferredBlockSize();
-      isEncrypted = (feInfo != null) ||
-          (isRawPath && fsd.isInAnEZ(INodesInPath.fromINode(node)));
+      isEncrypted = (feInfo != null)
+          || (isRawPath && FSDirEncryptionZoneOp.isInAnEZ(fsd,
+              INodesInPath.fromINode(node)));
     } else {
-      isEncrypted = fsd.isInAnEZ(INodesInPath.fromINode(node));
+      isEncrypted = FSDirEncryptionZoneOp.isInAnEZ(fsd,
+          INodesInPath.fromINode(node));
     }
 
     int childrenNum = node.isDirectory() ?
@@ -492,8 +495,8 @@ class FSDirStatAndListingOp {
     long blocksize = 0;
     LocatedBlocks loc = null;
     final boolean isEncrypted;
-    final FileEncryptionInfo feInfo = isRawPath ? null :
-        fsd.getFileEncryptionInfo(node, snapshot, iip);
+    final FileEncryptionInfo feInfo = isRawPath ? null : FSDirEncryptionZoneOp
+        .getFileEncryptionInfo(fsd, node, snapshot, iip);
     if (node.isFile()) {
       final INodeFile fileNode = node.asFile();
       size = fileNode.computeFileSize(snapshot);
@@ -511,10 +514,12 @@ class FSDirStatAndListingOp {
       if (loc == null) {
         loc = new LocatedBlocks();
       }
-      isEncrypted = (feInfo != null) ||
-          (isRawPath && fsd.isInAnEZ(INodesInPath.fromINode(node)));
+      isEncrypted = (feInfo != null)
+          || (isRawPath && FSDirEncryptionZoneOp.isInAnEZ(fsd,
+              INodesInPath.fromINode(node)));
     } else {
-      isEncrypted = fsd.isInAnEZ(INodesInPath.fromINode(node));
+      isEncrypted = FSDirEncryptionZoneOp.isInAnEZ(fsd,
+          INodesInPath.fromINode(node));
     }
     int childrenNum = node.isDirectory() ?
         node.asDirectory().getChildrenNum(snapshot) : 0;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13bdc20d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
index e9d0806..575b1fd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
@@ -370,7 +370,7 @@ class FSDirWriteFileOp {
 
     FileEncryptionInfo feInfo = null;
 
-    final EncryptionZone zone = fsd.getEZForPath(iip);
+    final EncryptionZone zone = FSDirEncryptionZoneOp.getEZForPath(fsd, iip);
     if (zone != null) {
       // The path is now within an EZ, but we're missing encryption parameters
       if (suite == null || edek == null) {
@@ -423,7 +423,7 @@ class FSDirWriteFileOp {
         newNode.getFileUnderConstructionFeature().getClientName(),
         newNode.getId());
     if (feInfo != null) {
-      fsd.setFileEncryptionInfo(src, feInfo);
+      FSDirEncryptionZoneOp.setFileEncryptionInfo(fsd, src, feInfo);
       newNode = fsd.getInode(newNode.getId()).asFile();
     }
     setNewINodeStoragePolicy(fsd.getBlockManager(), newNode, iip,
@@ -445,7 +445,7 @@ class FSDirWriteFileOp {
     src = fsd.resolvePath(pc, src, pathComponents);
     INodesInPath iip = fsd.getINodesInPath4Write(src);
     // Nothing to do if the path is not within an EZ
-    final EncryptionZone zone = fsd.getEZForPath(iip);
+    final EncryptionZone zone = FSDirEncryptionZoneOp.getEZForPath(fsd, iip);
     if (zone == null) {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13bdc20d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 4dc5326..866305f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -19,28 +19,22 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 import com.google.protobuf.InvalidProtocolBufferException;
 
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.crypto.CipherSuite;
-import org.apache.hadoop.crypto.CryptoProtocolVersion;
-import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.XAttr;
-import org.apache.hadoop.fs.XAttrSetFlag;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.XAttrHelper;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
-import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.hadoop.hdfs.protocol.FSLimitException.MaxDirectoryItemsExceededException;
 import org.apache.hadoop.hdfs.protocol.FSLimitException.PathComponentTooLongException;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -71,14 +65,12 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.RecursiveAction;
-import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import static org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
 import static org.apache.hadoop.fs.CommonConfigurationKeys.FS_PROTECTED_DIRECTORIES;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY;
@@ -87,7 +79,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_QUOTA_BY_STORAGETYPE_ENAB
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_ENCRYPTION_ZONE;
-import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_FILE_ENCRYPTION_INFO;
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.SECURITY_XATTR_UNREADABLE_BY_SUPERUSER;
 import static org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.CURRENT_STATE_ID;
 
@@ -349,6 +340,10 @@ public class FSDirectory implements Closeable {
     return getFSNamesystem().getBlockManager();
   }
 
+  KeyProviderCryptoExtension getProvider() {
+    return getFSNamesystem().getProvider();
+  }
+
   /** @return the root directory inode. */
   public INodeDirectory getRoot() {
     return rootDir;
@@ -1204,138 +1199,6 @@ public class FSDirectory implements Closeable {
     }
   }
 
-  boolean isInAnEZ(INodesInPath iip)
-      throws UnresolvedLinkException, SnapshotAccessControlException {
-    readLock();
-    try {
-      return ezManager.isInAnEZ(iip);
-    } finally {
-      readUnlock();
-    }
-  }
-
-  String getKeyName(INodesInPath iip) {
-    readLock();
-    try {
-      return ezManager.getKeyName(iip);
-    } finally {
-      readUnlock();
-    }
-  }
-
-  XAttr createEncryptionZone(String src, CipherSuite suite,
-      CryptoProtocolVersion version, String keyName)
-    throws IOException {
-    writeLock();
-    try {
-      return ezManager.createEncryptionZone(src, suite, version, keyName);
-    } finally {
-      writeUnlock();
-    }
-  }
-
-  EncryptionZone getEZForPath(INodesInPath iip) {
-    readLock();
-    try {
-      return ezManager.getEZINodeForPath(iip);
-    } finally {
-      readUnlock();
-    }
-  }
-
-  BatchedListEntries<EncryptionZone> listEncryptionZones(long prevId)
-      throws IOException {
-    readLock();
-    try {
-      return ezManager.listEncryptionZones(prevId);
-    } finally {
-      readUnlock();
-    }
-  }
-
-  /**
-   * Set the FileEncryptionInfo for an INode.
-   */
-  void setFileEncryptionInfo(String src, FileEncryptionInfo info)
-      throws IOException {
-    // Make the PB for the xattr
-    final HdfsProtos.PerFileEncryptionInfoProto proto =
-        PBHelperClient.convertPerFileEncInfo(info);
-    final byte[] protoBytes = proto.toByteArray();
-    final XAttr fileEncryptionAttr =
-        XAttrHelper.buildXAttr(CRYPTO_XATTR_FILE_ENCRYPTION_INFO, protoBytes);
-    final List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
-    xAttrs.add(fileEncryptionAttr);
-
-    writeLock();
-    try {
-      FSDirXAttrOp.unprotectedSetXAttrs(this, src, xAttrs,
-                                        EnumSet.of(XAttrSetFlag.CREATE));
-    } finally {
-      writeUnlock();
-    }
-  }
-
-  /**
-   * This function combines the per-file encryption info (obtained
-   * from the inode's XAttrs), and the encryption info from its zone, and
-   * returns a consolidated FileEncryptionInfo instance. Null is returned
-   * for non-encrypted files.
-   *
-   * @param inode inode of the file
-   * @param snapshotId ID of the snapshot that
-   *                   we want to get encryption info from
-   * @param iip inodes in the path containing the file, passed in to
-   *            avoid obtaining the list of inodes again; if iip is
-   *            null then the list of inodes will be obtained again
-   * @return consolidated file encryption info; null for non-encrypted files
-   */
-  FileEncryptionInfo getFileEncryptionInfo(INode inode, int snapshotId,
-      INodesInPath iip) throws IOException {
-    if (!inode.isFile()) {
-      return null;
-    }
-    readLock();
-    try {
-      EncryptionZone encryptionZone = getEZForPath(iip);
-      if (encryptionZone == null) {
-        // not an encrypted file
-        return null;
-      } else if(encryptionZone.getPath() == null
-          || encryptionZone.getPath().isEmpty()) {
-        if (NameNode.LOG.isDebugEnabled()) {
-          NameNode.LOG.debug("Encryption zone " +
-              encryptionZone.getPath() + " does not have a valid path.");
-        }
-      }
-
-      final CryptoProtocolVersion version = encryptionZone.getVersion();
-      final CipherSuite suite = encryptionZone.getSuite();
-      final String keyName = encryptionZone.getKeyName();
-
-      XAttr fileXAttr = FSDirXAttrOp.unprotectedGetXAttrByPrefixedName(inode,
-          snapshotId, CRYPTO_XATTR_FILE_ENCRYPTION_INFO);
-
-      if (fileXAttr == null) {
-        NameNode.LOG.warn("Could not find encryption XAttr for file " +
-            iip.getPath() + " in encryption zone " + encryptionZone.getPath());
-        return null;
-      }
-
-      try {
-        HdfsProtos.PerFileEncryptionInfoProto fileProto =
-            HdfsProtos.PerFileEncryptionInfoProto.parseFrom(
-                fileXAttr.getValue());
-        return PBHelperClient.convert(fileProto, suite, version, keyName);
-      } catch (InvalidProtocolBufferException e) {
-        throw new IOException("Could not parse file encryption info for " +
-            "inode " + inode, e);
-      }
-    } finally {
-      readUnlock();
-    }
-  }
-
   static INode resolveLastINode(INodesInPath iip) throws FileNotFoundException {
     INode inode = iip.getLastINode();
     if (inode == null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13bdc20d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index f3c0e94..bcdcc33 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import static org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
@@ -105,7 +104,6 @@ import java.io.StringWriter;
 import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.net.URI;
-import java.security.GeneralSecurityException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -118,6 +116,7 @@ import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
@@ -135,10 +134,10 @@ import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.crypto.CipherSuite;
 import org.apache.hadoop.crypto.CryptoProtocolVersion;
 import org.apache.hadoop.crypto.key.KeyProvider;
 import org.apache.hadoop.crypto.CryptoCodec;
+import org.apache.hadoop.crypto.key.KeyProvider.Metadata;
 import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
 import org.apache.hadoop.fs.CacheFlag;
@@ -2022,29 +2021,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   }
 
   /**
-   * Invoke KeyProvider APIs to generate an encrypted data encryption key for an
-   * encryption zone. Should not be called with any locks held.
-   *
-   * @param ezKeyName key name of an encryption zone
-   * @return New EDEK, or null if ezKeyName is null
-   * @throws IOException
-   */
-  private EncryptedKeyVersion generateEncryptedDataEncryptionKey(String
-      ezKeyName) throws IOException {
-    if (ezKeyName == null) {
-      return null;
-    }
-    EncryptedKeyVersion edek = null;
-    try {
-      edek = provider.generateEncryptedKey(ezKeyName);
-    } catch (GeneralSecurityException e) {
-      throw new IOException(e);
-    }
-    Preconditions.checkNotNull(edek);
-    return edek;
-  }
-
-  /**
    * Create a new file entry in the namespace.
    * 
    * For description of parameters and exceptions thrown see
@@ -2129,7 +2105,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
       // Generate EDEK if necessary while not holding the lock
       if (ezInfo != null) {
-        ezInfo.edek = generateEncryptedDataEncryptionKey(ezInfo.ezKeyName);
+        ezInfo.edek = FSDirEncryptionZoneOp
+            .generateEncryptedDataEncryptionKey(dir, ezInfo.ezKeyName);
       }
       EncryptionFaultInjector.getInstance().startFileAfterGenerateKey();
     }
@@ -6975,74 +6952,34 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    * @throws SafeModeException       if the Namenode is in safe mode.
    */
   void createEncryptionZone(final String src, final String keyName,
-                            boolean logRetryCache)
-    throws IOException, UnresolvedLinkException,
-      SafeModeException, AccessControlException {
+      boolean logRetryCache) throws IOException, UnresolvedLinkException,
+          SafeModeException, AccessControlException {
     try {
-      if (provider == null) {
-        throw new IOException(
-            "Can't create an encryption zone for " + src +
-            " since no key provider is available.");
-      }
-      if (keyName == null || keyName.isEmpty()) {
-        throw new IOException("Must specify a key name when creating an " +
-            "encryption zone");
-      }
-      KeyProvider.Metadata metadata = provider.getMetadata(keyName);
-      if (metadata == null) {
-        /*
-         * It would be nice if we threw something more specific than
-         * IOException when the key is not found, but the KeyProvider API
-         * doesn't provide for that. If that API is ever changed to throw
-         * something more specific (e.g. UnknownKeyException) then we can
-         * update this to match it, or better yet, just rethrow the
-         * KeyProvider's exception.
-         */
-        throw new IOException("Key " + keyName + " doesn't exist.");
+      Metadata metadata = FSDirEncryptionZoneOp.ensureKeyIsInitialized(dir,
+          keyName, src);
+      checkSuperuserPrivilege();
+      FSPermissionChecker pc = getPermissionChecker();
+      checkOperation(OperationCategory.WRITE);
+      final HdfsFileStatus resultingStat;
+      writeLock();
+      try {
+        checkSuperuserPrivilege();
+        checkOperation(OperationCategory.WRITE);
+        checkNameNodeSafeMode("Cannot create encryption zone on " + src);
+        resultingStat = FSDirEncryptionZoneOp.createEncryptionZone(dir, src,
+            pc, metadata.getCipher(), keyName, logRetryCache);
+      } finally {
+        writeUnlock();
       }
-      // If the provider supports pool for EDEKs, this will fill in the pool
-      provider.warmUpEncryptedKeys(keyName);
-      createEncryptionZoneInt(src, metadata.getCipher(),
-          keyName, logRetryCache);
+
+      getEditLog().logSync();
+      logAuditEvent(true, "createEncryptionZone", src, null, resultingStat);
     } catch (AccessControlException e) {
       logAuditEvent(false, "createEncryptionZone", src);
       throw e;
     }
   }
 
-  private void createEncryptionZoneInt(final String srcArg, String cipher,
-      String keyName, final boolean logRetryCache) throws IOException {
-    String src = srcArg;
-    HdfsFileStatus resultingStat = null;
-    checkSuperuserPrivilege();
-    final byte[][] pathComponents =
-      FSDirectory.getPathComponentsForReservedPath(src);
-    FSPermissionChecker pc = getPermissionChecker();
-    writeLock();
-    try {
-      checkSuperuserPrivilege();
-      checkOperation(OperationCategory.WRITE);
-      checkNameNodeSafeMode("Cannot create encryption zone on " + src);
-      src = dir.resolvePath(pc, src, pathComponents);
-
-      final CipherSuite suite = CipherSuite.convert(cipher);
-      // For now this is hardcoded, as we only support one method.
-      final CryptoProtocolVersion version =
-          CryptoProtocolVersion.ENCRYPTION_ZONES;
-      final XAttr ezXAttr = dir.createEncryptionZone(src, suite,
-          version, keyName);
-      List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
-      xAttrs.add(ezXAttr);
-      getEditLog().logSetXAttrs(src, xAttrs, logRetryCache);
-      final INodesInPath iip = dir.getINodesInPath4Write(src, false);
-      resultingStat = dir.getAuditFileInfo(iip);
-    } finally {
-      writeUnlock();
-    }
-    getEditLog().logSync();
-    logAuditEvent(true, "createEncryptionZone", srcArg, null, resultingStat);
-  }
-
   /**
    * Get the encryption zone for the specified path.
    *
@@ -7053,25 +6990,18 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    */
   EncryptionZone getEZForPath(final String srcArg)
     throws AccessControlException, UnresolvedLinkException, IOException {
-    String src = srcArg;
     HdfsFileStatus resultingStat = null;
-    final byte[][] pathComponents =
-        FSDirectory.getPathComponentsForReservedPath(src);
     boolean success = false;
     final FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.READ);
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      src = dir.resolvePath(pc, src, pathComponents);
-      final INodesInPath iip = dir.getINodesInPath(src, true);
-      if (isPermissionEnabled) {
-        dir.checkPathAccess(pc, iip, FsAction.READ);
-      }
-      final EncryptionZone ret = dir.getEZForPath(iip);
-      resultingStat = dir.getAuditFileInfo(iip);
+      Entry<EncryptionZone, HdfsFileStatus> ezForPath = FSDirEncryptionZoneOp
+          .getEZForPath(dir, srcArg, pc);
       success = true;
-      return ret;
+      resultingStat = ezForPath.getValue();
+      return ezForPath.getKey();
     } finally {
       readUnlock();
       logAuditEvent(success, "getEZForPath", srcArg, null, resultingStat);
@@ -7088,7 +7018,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       checkSuperuserPrivilege();
       checkOperation(OperationCategory.READ);
       final BatchedListEntries<EncryptionZone> ret =
-          dir.listEncryptionZones(prevId);
+          FSDirEncryptionZoneOp.listEncryptionZones(dir, prevId);
       success = true;
       return ret;
     } finally {