You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2019/02/16 18:32:49 UTC

[hadoop] branch trunk updated: HDDS-1041. Support TDE(Transparent Data Encryption) for Ozone. Contributed by Xiaoyu Yao.

This is an automated email from the ASF dual-hosted git repository.

aengineer pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7ea9149  HDDS-1041. Support TDE(Transparent Data Encryption) for Ozone. Contributed by Xiaoyu Yao.
7ea9149 is described below

commit 7ea91494ceb5a31e71e677183027f68b256d411a
Author: Anu Engineer <ae...@apache.org>
AuthorDate: Sat Feb 16 10:23:10 2019 -0800

    HDDS-1041. Support TDE(Transparent Data Encryption) for Ozone.
    Contributed by Xiaoyu Yao.
---
 .../org/apache/hadoop/ozone/client/BucketArgs.java |  28 ++-
 .../apache/hadoop/ozone/client/OzoneBucket.java    |  39 ++++
 .../hadoop/ozone/client/OzoneClientUtils.java      |   4 +
 .../hadoop/ozone/client/OzoneKeyDetails.java       |  10 +-
 .../hadoop/ozone/client/io/KeyOutputStream.java    |  10 +
 .../hadoop/ozone/client/rest/RestClient.java       |   5 +-
 .../hadoop/ozone/client/rpc/OzoneKMSUtil.java      | 176 +++++++++++++++
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |  67 +++++-
 .../ozone/client/rest/response/BucketInfo.java     |  17 ++
 .../ozone/client/rest/response/KeyInfoDetails.java |  11 +
 .../hadoop/ozone/om/exceptions/OMException.java    |   9 +-
 .../ozone/om/helpers/BucketEncryptionKeyInfo.java  |  79 +++++++
 .../ozone/om/helpers/EncryptionBucketInfo.java     | 114 ++++++++++
 .../hadoop/ozone/om/helpers/OmBucketInfo.java      |  68 ++++--
 .../apache/hadoop/ozone/om/helpers/OmKeyInfo.java  |  33 ++-
 .../apache/hadoop/ozone/protocolPB/OMPBHelper.java | 129 +++++++++++
 .../hadoop/ozone/web/response/BucketInfo.java      |  10 +
 .../src/main/proto/OzoneManagerProtocol.proto      |  63 +++++-
 .../dist/src/main/compose/ozonesecure/.env         |   1 +
 .../main/compose/ozonesecure/docker-compose.yaml   |   9 +
 .../src/main/compose/ozonesecure/docker-config     |   2 +
 hadoop-ozone/integration-test/pom.xml              |  12 +-
 .../client/rpc/TestOzoneAtRestEncryption.java      | 238 +++++++++++++++++++++
 .../apache/hadoop/ozone/om/BucketManagerImpl.java  |  57 ++++-
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java |  93 +++++++-
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  40 +++-
 .../web/ozShell/bucket/CreateBucketHandler.java    |  32 ++-
 .../hadoop/ozone/om/TestBucketManagerImpl.java     |  42 +++-
 .../hadoop/ozone/client/OzoneBucketStub.java       |   2 +-
 29 files changed, 1333 insertions(+), 67 deletions(-)

diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/BucketArgs.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/BucketArgs.java
index b8c05e1..5bae15d 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/BucketArgs.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/BucketArgs.java
@@ -51,17 +51,26 @@ public final class BucketArgs {
   private Map<String, String> metadata;
 
   /**
+   * Bucket encryption key name.
+   */
+  private String bucketEncryptionKey;
+
+  /**
    * Private constructor, constructed via builder.
    * @param versioning Bucket version flag.
    * @param storageType Storage type to be used.
    * @param acls list of ACLs.
+   * @param metadata map of bucket metadata
+   * @param bucketEncryptionKey bucket encryption key name
    */
   private BucketArgs(Boolean versioning, StorageType storageType,
-                     List<OzoneAcl> acls, Map<String, String> metadata) {
+                     List<OzoneAcl> acls, Map<String, String> metadata,
+                     String bucketEncryptionKey) {
     this.acls = acls;
     this.versioning = versioning;
     this.storageType = storageType;
     this.metadata = metadata;
+    this.bucketEncryptionKey = bucketEncryptionKey;
   }
 
   /**
@@ -98,6 +107,14 @@ public final class BucketArgs {
   }
 
   /**
+   * Returns the bucket encryption key name.
+   * @return bucket encryption key
+   */
+  public String getEncryptionKey() {
+    return bucketEncryptionKey;
+  }
+
+  /**
    * Returns new builder class that builds a OmBucketInfo.
    *
    * @return Builder
@@ -114,6 +131,7 @@ public final class BucketArgs {
     private StorageType storageType;
     private List<OzoneAcl> acls;
     private Map<String, String> metadata;
+    private String bucketEncryptionKey;
 
     public Builder() {
       metadata = new HashMap<>();
@@ -138,12 +156,18 @@ public final class BucketArgs {
       this.metadata.put(key, value);
       return this;
     }
+
+    public BucketArgs.Builder setBucketEncryptionKey(String bek) {
+      this.bucketEncryptionKey = bek;
+      return this;
+    }
     /**
      * Constructs the BucketArgs.
      * @return instance of BucketArgs.
      */
     public BucketArgs build() {
-      return new BucketArgs(versioning, storageType, acls, metadata);
+      return new BucketArgs(versioning, storageType, acls, metadata,
+          bucketEncryptionKey);
     }
   }
 }
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
index 2c17558..735bc04 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
@@ -95,6 +95,37 @@ public class OzoneBucket extends WithMetadata {
   private long creationTime;
 
   /**
+   * Bucket Encryption key name if bucket encryption is enabled.
+   */
+  private String encryptionKeyName;
+
+  @SuppressWarnings("parameternumber")
+  public OzoneBucket(Configuration conf, ClientProtocol proxy,
+                     String volumeName, String bucketName,
+                     List<OzoneAcl> acls, StorageType storageType,
+                     Boolean versioning, long creationTime,
+                     Map<String, String> metadata,
+                     String encryptionKeyName) {
+    Preconditions.checkNotNull(proxy, "Client proxy is not set.");
+    this.proxy = proxy;
+    this.volumeName = volumeName;
+    this.name = bucketName;
+    this.acls = acls;
+    this.storageType = storageType;
+    this.versioning = versioning;
+    this.listCacheSize = HddsClientUtils.getListCacheSize(conf);
+    this.creationTime = creationTime;
+    this.defaultReplication = ReplicationFactor.valueOf(conf.getInt(
+        OzoneConfigKeys.OZONE_REPLICATION,
+        OzoneConfigKeys.OZONE_REPLICATION_DEFAULT));
+    this.defaultReplicationType = ReplicationType.valueOf(conf.get(
+        OzoneConfigKeys.OZONE_REPLICATION_TYPE,
+        OzoneConfigKeys.OZONE_REPLICATION_TYPE_DEFAULT));
+    this.metadata = metadata;
+    this.encryptionKeyName = encryptionKeyName;
+  }
+
+  /**
    * Constructs OzoneBucket instance.
    * @param conf Configuration object.
    * @param proxy ClientProtocol proxy.
@@ -202,6 +233,14 @@ public class OzoneBucket extends WithMetadata {
   }
 
   /**
+   * Return the bucket encryption key name.
+   * @return the bucket encryption key name
+   */
+  public String getEncryptionKeyName() {
+    return encryptionKeyName;
+  }
+
+  /**
    * Adds ACLs to the Bucket.
    * @param addAcls ACLs to be added
    * @throws IOException
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
index be1449f..9239db8 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
@@ -47,6 +47,9 @@ public final class OzoneClientUtils {
     bucketInfo.setVersioning(
         OzoneConsts.Versioning.getVersioning(bucket.getVersioning()));
     bucketInfo.setAcls(bucket.getAcls());
+    bucketInfo.setEncryptionKeyName(
+        bucket.getEncryptionKeyName()==null? "N/A" :
+            bucket.getEncryptionKeyName());
     return bucketInfo;
   }
 
@@ -104,6 +107,7 @@ public final class OzoneClientUtils {
     key.getOzoneKeyLocations().forEach((a) -> keyLocations.add(new KeyLocation(
         a.getContainerID(), a.getLocalID(), a.getLength(), a.getOffset())));
     keyInfo.setKeyLocation(keyLocations);
+    keyInfo.setFileEncryptionInfo(key.getFileEncryptionInfo());
     return keyInfo;
   }
 }
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneKeyDetails.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneKeyDetails.java
index 8cae9a5..9282353 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneKeyDetails.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneKeyDetails.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ozone.client;
 
+import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.hdds.client.ReplicationType;
 
 import java.util.List;
@@ -35,6 +36,8 @@ public class OzoneKeyDetails extends OzoneKey {
 
   private Map<String, String> metadata;
 
+  private FileEncryptionInfo feInfo;
+
   /**
    * Constructs OzoneKeyDetails from OmKeyInfo.
    */
@@ -42,11 +45,13 @@ public class OzoneKeyDetails extends OzoneKey {
   public OzoneKeyDetails(String volumeName, String bucketName, String keyName,
                          long size, long creationTime, long modificationTime,
                          List<OzoneKeyLocation> ozoneKeyLocations,
-                         ReplicationType type, Map<String, String> metadata) {
+                         ReplicationType type, Map<String, String> metadata,
+                         FileEncryptionInfo feInfo) {
     super(volumeName, bucketName, keyName, size, creationTime,
         modificationTime, type);
     this.ozoneKeyLocations = ozoneKeyLocations;
     this.metadata = metadata;
+    this.feInfo = feInfo;
   }
 
   /**
@@ -60,6 +65,9 @@ public class OzoneKeyDetails extends OzoneKey {
     return metadata;
   }
 
+  public FileEncryptionInfo getFileEncryptionInfo() {
+    return feInfo;
+  }
   /**
    * Set details of key location.
    * @param ozoneKeyLocations - details of key location
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index b94e14f..44972ae 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.client.io;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
@@ -81,6 +82,8 @@ public class KeyOutputStream extends OutputStream {
   private final Checksum checksum;
   private List<ByteBuffer> bufferList;
   private OmMultipartCommitUploadPartInfo commitUploadPartInfo;
+  private FileEncryptionInfo feInfo;
+
   /**
    * A constructor for testing purpose only.
    */
@@ -145,6 +148,9 @@ public class KeyOutputStream extends OutputStream {
     this.omClient = omClient;
     this.scmClient = scmClient;
     OmKeyInfo info = handler.getKeyInfo();
+    // Retrieve the file encryption key info, null if file is not in
+    // encrypted bucket.
+    this.feInfo = info.getFileEncryptionInfo();
     this.keyArgs = new OmKeyArgs.Builder().setVolumeName(info.getVolumeName())
         .setBucketName(info.getBucketName()).setKeyName(info.getKeyName())
         .setType(type).setFactor(factor).setDataSize(info.getDataSize())
@@ -547,6 +553,10 @@ public class KeyOutputStream extends OutputStream {
     return commitUploadPartInfo;
   }
 
+  public FileEncryptionInfo getFileEncryptionInfo() {
+    return feInfo;
+  }
+
   /**
    * Builder class of KeyOutputStream.
    */
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java
index 013cb8f..ba21ca7 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java
@@ -614,7 +614,8 @@ public class RestClient implements ClientProtocol {
             bucketInfo.getBucketName(), bucketInfo.getAcls(),
             bucketInfo.getStorageType(),
             getBucketVersioningFlag(bucketInfo.getVersioning()), creationTime,
-            new HashMap<>());
+            new HashMap<>(), bucketInfo
+            .getEncryptionKeyName());
       }).collect(Collectors.toList());
     } catch (URISyntaxException e) {
       throw new IOException(e);
@@ -870,7 +871,7 @@ public class RestClient implements ClientProtocol {
           HddsClientUtils.formatDateTime(keyInfo.getModifiedOn()),
           ozoneKeyLocations, ReplicationType.valueOf(
               keyInfo.getType().toString()),
-          new HashMap<>());
+          new HashMap<>(), keyInfo.getFileEncryptionInfo());
       EntityUtils.consume(response);
       return key;
     } catch (URISyntaxException | ParseException e) {
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/OzoneKMSUtil.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/OzoneKMSUtil.java
new file mode 100644
index 0000000..ba00a31
--- /dev/null
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/OzoneKMSUtil.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client.rpc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CipherSuite;
+import org.apache.hadoop.crypto.CryptoCodec;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
+import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.KMSUtil;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.security.GeneralSecurityException;
+
+/**
+ * KMS utility class for Ozone Data Encryption At-Rest.
+ */
+public final class OzoneKMSUtil {
+
+  private static final String UTF8_CSN = StandardCharsets.UTF_8.name();
+  private static final String O3_KMS_PREFIX = "ozone-kms-";
+  private static String keyProviderUriKeyName =
+      "hadoop.security.key.provider.path";
+
+  private OzoneKMSUtil() {
+  }
+
+  public static KeyProvider.KeyVersion decryptEncryptedDataEncryptionKey(
+      FileEncryptionInfo feInfo, KeyProvider keyProvider) throws IOException {
+    if (keyProvider == null) {
+      throw new IOException("No KeyProvider is configured, " +
+          "cannot access an encrypted file");
+    } else {
+      EncryptedKeyVersion ekv = EncryptedKeyVersion.createForDecryption(
+          feInfo.getKeyName(), feInfo.getEzKeyVersionName(), feInfo.getIV(),
+          feInfo.getEncryptedDataEncryptionKey());
+
+      try {
+        KeyProviderCryptoExtension cryptoProvider = KeyProviderCryptoExtension
+            .createKeyProviderCryptoExtension(keyProvider);
+        return cryptoProvider.decryptEncryptedKey(ekv);
+      } catch (GeneralSecurityException gse) {
+        throw new IOException(gse);
+      }
+    }
+  }
+
+  /**
+   * Returns a key to map ozone uri to key provider uri.
+   * Tasks will lookup this key to find key Provider.
+   */
+  public static Text getKeyProviderMapKey(URI namespaceUri) {
+    return new Text(O3_KMS_PREFIX + namespaceUri.getScheme()
+        +"://" + namespaceUri.getAuthority());
+  }
+
+  public static String bytes2String(byte[] bytes) {
+    return bytes2String(bytes, 0, bytes.length);
+  }
+
+  private static String bytes2String(byte[] bytes, int offset, int length) {
+    try {
+      return new String(bytes, offset, length, UTF8_CSN);
+    } catch (UnsupportedEncodingException e) {
+      throw new IllegalArgumentException("UTF8 encoding is not supported", e);
+    }
+  }
+
+  public static URI getKeyProviderUri(UserGroupInformation ugi,
+      URI namespaceUri, String kmsUriSrv, Configuration conf)
+      throws IOException {
+    URI keyProviderUri = null;
+    Credentials credentials = ugi.getCredentials();
+    Text credsKey = null;
+    if (namespaceUri != null) {
+      // from ugi
+      credsKey = getKeyProviderMapKey(namespaceUri);
+      byte[] keyProviderUriBytes = credentials.getSecretKey(credsKey);
+      if (keyProviderUriBytes != null) {
+        keyProviderUri = URI.create(bytes2String(keyProviderUriBytes));
+      }
+    }
+    if (keyProviderUri == null) {
+      // from client conf
+      if (kmsUriSrv == null) {
+        keyProviderUri = KMSUtil.getKeyProviderUri(
+            conf, keyProviderUriKeyName);
+      } else if (!kmsUriSrv.isEmpty()) {
+        // from om server
+        keyProviderUri = URI.create(kmsUriSrv);
+      }
+    }
+    // put back into UGI
+    if (keyProviderUri != null && credsKey != null) {
+      credentials.addSecretKey(
+          credsKey, DFSUtil.string2Bytes(keyProviderUri.toString()));
+    }
+
+    return keyProviderUri;
+  }
+
+  public static KeyProvider getKeyProvider(final Configuration conf,
+      final URI serverProviderUri) throws IOException{
+    return KMSUtil.createKeyProviderFromUri(conf, serverProviderUri);
+  }
+
+  public static CryptoProtocolVersion getCryptoProtocolVersion(
+      FileEncryptionInfo feInfo) throws IOException {
+    CryptoProtocolVersion version = feInfo.getCryptoProtocolVersion();
+    if (!CryptoProtocolVersion.supports(version)) {
+      throw new IOException("Client does not support specified " +
+              "CryptoProtocolVersion " + version.getDescription() +
+              " version number" + version.getVersion());
+    } else {
+      return version;
+    }
+  }
+
+  public static void checkCryptoProtocolVersion(
+          FileEncryptionInfo feInfo) throws IOException {
+    CryptoProtocolVersion version = feInfo.getCryptoProtocolVersion();
+    if (!CryptoProtocolVersion.supports(version)) {
+      throw new IOException("Client does not support specified " +
+              "CryptoProtocolVersion " + version.getDescription() +
+              " version number" + version.getVersion());
+    }
+  }
+
+  public static CryptoCodec getCryptoCodec(Configuration conf,
+      FileEncryptionInfo feInfo) throws IOException {
+    CipherSuite suite = feInfo.getCipherSuite();
+    if (suite.equals(CipherSuite.UNKNOWN)) {
+      throw new IOException("NameNode specified unknown CipherSuite with ID " +
+              suite.getUnknownValue() + ", cannot instantiate CryptoCodec.");
+    } else {
+      CryptoCodec codec = CryptoCodec.getInstance(conf, suite);
+      if (codec == null) {
+        throw new OMException("No configuration found for the cipher suite " +
+                suite.getConfigSuffix() + " prefixed with " +
+                "hadoop.security.crypto.codec.classes. Please see the" +
+                " example configuration hadoop.security.crypto.codec.classes." +
+                "EXAMPLE CIPHER SUITE at core-default.xml for details.",
+                OMException.ResultCodes.UNKNOWN_CIPHER_SUITE);
+      } else {
+        return codec;
+      }
+    }
+  }
+}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 5a48d8a..eb27b7b 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -21,6 +21,10 @@ package org.apache.hadoop.ozone.client.rpc;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.crypto.CryptoInputStream;
+import org.apache.hadoop.crypto.CryptoOutputStream;
+import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.StorageType;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
@@ -45,6 +49,7 @@ import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
 import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
@@ -366,6 +371,12 @@ public class RpcClient implements ClientProtocol {
         Boolean.FALSE : bucketArgs.getVersioning();
     StorageType storageType = bucketArgs.getStorageType() == null ?
         StorageType.DEFAULT : bucketArgs.getStorageType();
+    BucketEncryptionKeyInfo bek = null;
+    if (bucketArgs.getEncryptionKey() != null) {
+      bek = new BucketEncryptionKeyInfo.Builder()
+          .setKeyName(bucketArgs.getEncryptionKey()).build();
+    }
+
     List<OzoneAcl> listOfAcls = new ArrayList<>();
     //User ACL
     listOfAcls.add(new OzoneAcl(OzoneAcl.OzoneACLType.USER,
@@ -388,9 +399,13 @@ public class RpcClient implements ClientProtocol {
         .setStorageType(storageType)
         .setAcls(listOfAcls.stream().distinct().collect(Collectors.toList()));
 
+    if (bek != null) {
+      builder.setBucketEncryptionKey(bek);
+    }
+
     LOG.info("Creating Bucket: {}/{}, with Versioning {} and " +
-            "Storage Type set to {}", volumeName, bucketName, isVersionEnabled,
-            storageType);
+            "Storage Type set to {} and Encryption set to {} ",
+        volumeName, bucketName, isVersionEnabled, storageType, bek != null);
     ozoneManagerClient.createBucket(builder.build());
   }
 
@@ -526,7 +541,9 @@ public class RpcClient implements ClientProtocol {
         bucketInfo.getStorageType(),
         bucketInfo.getIsVersionEnabled(),
         bucketInfo.getCreationTime(),
-        bucketInfo.getMetadata());
+        bucketInfo.getMetadata(),
+        bucketInfo.getEncryptionKeyInfo() != null ? bucketInfo
+            .getEncryptionKeyInfo().getKeyName() : null);
   }
 
   @Override
@@ -545,7 +562,9 @@ public class RpcClient implements ClientProtocol {
         bucket.getStorageType(),
         bucket.getIsVersionEnabled(),
         bucket.getCreationTime(),
-        bucket.getMetadata()))
+        bucket.getMetadata(),
+        bucket.getEncryptionKeyInfo() != null ? bucket
+            .getEncryptionKeyInfo().getKeyName() : null))
         .collect(Collectors.toList());
   }
 
@@ -588,7 +607,29 @@ public class RpcClient implements ClientProtocol {
     groupOutputStream.addPreallocateBlocks(
         openKey.getKeyInfo().getLatestVersionLocations(),
         openKey.getOpenVersion());
-    return new OzoneOutputStream(groupOutputStream);
+    final FileEncryptionInfo feInfo = groupOutputStream
+        .getFileEncryptionInfo();
+    if (feInfo != null) {
+      KeyProvider.KeyVersion decrypted = getDEK(feInfo);
+      final CryptoOutputStream cryptoOut = new CryptoOutputStream(
+          groupOutputStream, OzoneKMSUtil.getCryptoCodec(conf, feInfo),
+          decrypted.getMaterial(), feInfo.getIV());
+      return new OzoneOutputStream(cryptoOut);
+    } else {
+      return new OzoneOutputStream(groupOutputStream);
+    }
+  }
+
+  private KeyProvider.KeyVersion getDEK(FileEncryptionInfo feInfo)
+      throws IOException {
+    // check crypto protocol version
+    OzoneKMSUtil.checkCryptoProtocolVersion(feInfo);
+    KeyProvider.KeyVersion decrypted;
+    // TODO: support get kms uri from om rpc server.
+    decrypted = OzoneKMSUtil.decryptEncryptedDataEncryptionKey(feInfo,
+        OzoneKMSUtil.getKeyProvider(conf, OzoneKMSUtil.getKeyProviderUri(
+            ugi, null, null, conf)));
+    return decrypted;
   }
 
   @Override
@@ -608,6 +649,15 @@ public class RpcClient implements ClientProtocol {
         KeyInputStream.getFromOmKeyInfo(
             keyInfo, xceiverClientManager, storageContainerLocationClient,
             requestId);
+    FileEncryptionInfo feInfo = keyInfo.getFileEncryptionInfo();
+    if (feInfo != null) {
+      final KeyProvider.KeyVersion decrypted  = getDEK(feInfo);
+      final CryptoInputStream cryptoIn =
+          new CryptoInputStream(lengthInputStream.getWrappedStream(),
+              OzoneKMSUtil.getCryptoCodec(conf, feInfo),
+              decrypted.getMaterial(), feInfo.getIV());
+      return new OzoneInputStream(cryptoIn);
+    }
     return new OzoneInputStream(lengthInputStream.getWrappedStream());
   }
 
@@ -678,7 +728,8 @@ public class RpcClient implements ClientProtocol {
     return new OzoneKeyDetails(keyInfo.getVolumeName(), keyInfo.getBucketName(),
         keyInfo.getKeyName(), keyInfo.getDataSize(), keyInfo.getCreationTime(),
         keyInfo.getModificationTime(), ozoneKeyLocations, ReplicationType
-        .valueOf(keyInfo.getType().toString()), keyInfo.getMetadata());
+        .valueOf(keyInfo.getType().toString()), keyInfo.getMetadata(),
+        keyInfo.getFileEncryptionInfo());
   }
 
   @Override
@@ -738,7 +789,9 @@ public class RpcClient implements ClientProtocol {
         bucket.getStorageType(),
         bucket.getIsVersionEnabled(),
         bucket.getCreationTime(),
-        bucket.getMetadata()))
+        bucket.getMetadata(),
+        bucket.getEncryptionKeyInfo() != null ?
+            bucket.getEncryptionKeyInfo().getKeyName(): null))
         .collect(Collectors.toList());
   }
 
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/client/rest/response/BucketInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/client/rest/response/BucketInfo.java
index d515dc2..61d051d 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/client/rest/response/BucketInfo.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/client/rest/response/BucketInfo.java
@@ -44,6 +44,7 @@ public class BucketInfo implements Comparable<BucketInfo> {
   private List<OzoneAcl> acls;
   private OzoneConsts.Versioning versioning;
   private StorageType storageType;
+  private String bekName;
 
   /**
    * Constructor for BucketInfo.
@@ -185,6 +186,22 @@ public class BucketInfo implements Comparable<BucketInfo> {
   }
 
   /**
+   * Return bucket encryption key name.
+   * @return bucket encryption key name
+   */
+  public String getEncryptionKeyName() {
+    return bekName;
+  }
+
+  /**
+   * Sets the bucket encryption key name.
+   * @param name bucket encryption key name
+   */
+  public void setEncryptionKeyName(String name) {
+    this.bekName = name;
+  }
+
+  /**
    * Compares this object with the specified object for order.  Returns a
    * negative integer, zero, or a positive integer as this object is less
    * than, equal to, or greater than the specified object.
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/client/rest/response/KeyInfoDetails.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/client/rest/response/KeyInfoDetails.java
index 98506f0..b153239 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/client/rest/response/KeyInfoDetails.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/client/rest/response/KeyInfoDetails.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectReader;
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.hadoop.fs.FileEncryptionInfo;
 
 /**
  * KeyInfoDetails class is used for parsing json response
@@ -41,6 +42,8 @@ public class KeyInfoDetails extends KeyInfo {
    */
   private List<KeyLocation> keyLocations;
 
+  private FileEncryptionInfo feInfo;
+
   /**
    * Constructor needed for json serialization.
    */
@@ -65,6 +68,14 @@ public class KeyInfoDetails extends KeyInfo {
     return keyLocations;
   }
 
+  public void setFileEncryptionInfo(FileEncryptionInfo info) {
+    this.feInfo = info;
+  }
+
+  public FileEncryptionInfo getFileEncryptionInfo() {
+    return feInfo;
+  }
+
   /**
    * Parse a string to return KeyInfoDetails Object.
    *
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
index 609000c..a279e02 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
@@ -177,6 +177,13 @@ public class OMException extends IOException {
 
     SCM_IN_CHILL_MODE,
 
-    INVALID_REQUEST
+    INVALID_REQUEST,
+
+    BUCKET_ENCRYPTION_KEY_NOT_FOUND,
+
+    UNKNOWN_CIPHER_SUITE,
+
+    INVALID_KMS_PROVIDER,
+
   }
 }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/BucketEncryptionKeyInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/BucketEncryptionKeyInfo.java
new file mode 100644
index 0000000..e1ae0bb
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/BucketEncryptionKeyInfo.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.helpers;
+
+import org.apache.hadoop.crypto.CipherSuite;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
+
+/**
+ * Encryption key info for bucket encryption key.
+ */
+public class BucketEncryptionKeyInfo {
+  private final CryptoProtocolVersion version;
+  private final CipherSuite suite;
+  private final String keyName;
+
+  public BucketEncryptionKeyInfo(
+      CryptoProtocolVersion version, CipherSuite suite,
+      String keyName) {
+    this.version = version;
+    this.suite = suite;
+    this.keyName = keyName;
+  }
+
+  public String getKeyName() {
+    return keyName;
+  }
+
+  public CipherSuite getSuite() {
+    return suite;
+  }
+
+  public CryptoProtocolVersion getVersion() {
+    return version;
+  }
+
+  /**
+   * Builder for BucketEncryptionKeyInfo.
+   */
+  public static class Builder {
+    private CryptoProtocolVersion version;
+    private CipherSuite suite;
+    private String keyName;
+
+    public Builder setKeyName(String name) {
+      this.keyName = name;
+      return this;
+    }
+
+    public Builder setSuite(CipherSuite cs) {
+      this.suite = cs;
+      return this;
+    }
+
+    public Builder setVersion(CryptoProtocolVersion ver) {
+      this.version = ver;
+      return this;
+    }
+
+    public BucketEncryptionKeyInfo build() {
+      return new BucketEncryptionKeyInfo(version, suite, keyName);
+    }
+  }
+}
\ No newline at end of file
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/EncryptionBucketInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/EncryptionBucketInfo.java
new file mode 100644
index 0000000..0f82fe5
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/EncryptionBucketInfo.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.helpers;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.crypto.CipherSuite;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
+
+/**
+ * A simple class for representing an encryption bucket. Presently an encryption
+ * bucket only has a path (the root of the encryption zone), a key name, and a
+ * unique id. The id is used to implement batched listing of encryption zones.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class EncryptionBucketInfo {
+
+  private final CipherSuite suite;
+  private final CryptoProtocolVersion version;
+  private final String keyName;
+
+  private final long id;
+  private final String path;
+
+  public EncryptionBucketInfo(long id, String path, CipherSuite suite,
+                        CryptoProtocolVersion version, String keyName) {
+    this.id = id;
+    this.path = path;
+    this.suite = suite;
+    this.version = version;
+    this.keyName = keyName;
+  }
+
+  public long getId() {
+    return id;
+  }
+
+  public String getPath() {
+    return path;
+  }
+
+  public CipherSuite getSuite() {
+    return suite;
+  }
+
+  public CryptoProtocolVersion getVersion() {
+    return version;
+  }
+
+  public String getKeyName() {
+    return keyName;
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder(13, 31)
+        .append(id)
+        .append(path)
+        .append(suite)
+        .append(version)
+        .append(keyName).
+            toHashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null) {
+      return false;
+    }
+    if (obj == this) {
+      return true;
+    }
+    if (obj.getClass() != getClass()) {
+      return false;
+    }
+
+    EncryptionBucketInfo rhs = (EncryptionBucketInfo) obj;
+    return new EqualsBuilder().
+        append(id, rhs.id).
+        append(path, rhs.path).
+        append(suite, rhs.suite).
+        append(version, rhs.version).
+        append(keyName, rhs.keyName).
+        isEquals();
+  }
+
+  @Override
+  public String toString() {
+    return "EncryptionBucketInfo [id=" + id +
+        ", path=" + path +
+        ", suite=" + suite +
+        ", version=" + version +
+        ", keyName=" + keyName + "]";
+  }
+}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketInfo.java
index 14ed5f6..4cdaa48 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketInfo.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketInfo.java
@@ -66,6 +66,11 @@ public final class OmBucketInfo extends WithMetadata implements Auditable {
   private final long creationTime;
 
   /**
+   * Bucket encryption key info if encryption is enabled.
+   */
+  private BucketEncryptionKeyInfo bekInfo;
+
+  /**
    * Private constructor, constructed via builder.
    * @param volumeName - Volume name.
    * @param bucketName - Bucket name.
@@ -73,14 +78,18 @@ public final class OmBucketInfo extends WithMetadata implements Auditable {
    * @param isVersionEnabled - Bucket version flag.
    * @param storageType - Storage type to be used.
    * @param creationTime - Bucket creation time.
+   * @param metadata - metadata.
+   * @param bekInfo - bucket encryption key info.
    */
+  @SuppressWarnings("checkstyle:ParameterNumber")
   private OmBucketInfo(String volumeName,
                        String bucketName,
                        List<OzoneAcl> acls,
                        boolean isVersionEnabled,
                        StorageType storageType,
                        long creationTime,
-                       Map<String, String> metadata) {
+                       Map<String, String> metadata,
+                       BucketEncryptionKeyInfo bekInfo) {
     this.volumeName = volumeName;
     this.bucketName = bucketName;
     this.acls = acls;
@@ -88,6 +97,7 @@ public final class OmBucketInfo extends WithMetadata implements Auditable {
     this.storageType = storageType;
     this.creationTime = creationTime;
     this.metadata = metadata;
+    this.bekInfo = bekInfo;
   }
 
   /**
@@ -140,6 +150,15 @@ public final class OmBucketInfo extends WithMetadata implements Auditable {
   }
 
   /**
+   * Returns bucket encryption key info.
+   * @return bucket encryption key info
+   */
+  public BucketEncryptionKeyInfo getEncryptionKeyInfo() {
+    return bekInfo;
+  }
+
+
+  /**
    * Returns new builder class that builds a OmBucketInfo.
    *
    * @return Builder
@@ -174,6 +193,7 @@ public final class OmBucketInfo extends WithMetadata implements Auditable {
     private StorageType storageType;
     private long creationTime;
     private Map<String, String> metadata;
+    private BucketEncryptionKeyInfo bekInfo;
 
     public Builder() {
       //Default values
@@ -225,6 +245,12 @@ public final class OmBucketInfo extends WithMetadata implements Auditable {
       return this;
     }
 
+    public Builder setBucketEncryptionKey(
+        BucketEncryptionKeyInfo info) {
+      this.bekInfo = info;
+      return this;
+    }
+
     /**
      * Constructs the OmBucketInfo.
      * @return instance of OmBucketInfo.
@@ -237,8 +263,7 @@ public final class OmBucketInfo extends WithMetadata implements Auditable {
       Preconditions.checkNotNull(storageType);
 
       return new OmBucketInfo(volumeName, bucketName, acls,
-          isVersionEnabled, storageType, creationTime, metadata
-      );
+          isVersionEnabled, storageType, creationTime, metadata, bekInfo);
     }
   }
 
@@ -246,7 +271,7 @@ public final class OmBucketInfo extends WithMetadata implements Auditable {
    * Creates BucketInfo protobuf from OmBucketInfo.
    */
   public BucketInfo getProtobuf() {
-    return BucketInfo.newBuilder()
+    BucketInfo.Builder bib =  BucketInfo.newBuilder()
         .setVolumeName(volumeName)
         .setBucketName(bucketName)
         .addAllAcls(acls.stream().map(
@@ -254,8 +279,11 @@ public final class OmBucketInfo extends WithMetadata implements Auditable {
         .setIsVersionEnabled(isVersionEnabled)
         .setStorageType(storageType.toProto())
         .setCreationTime(creationTime)
-        .addAllMetadata(KeyValueUtil.toProtobuf(metadata))
-        .build();
+        .addAllMetadata(KeyValueUtil.toProtobuf(metadata));
+    if (bekInfo != null && bekInfo.getKeyName() != null) {
+      bib.setBeinfo(OMPBHelper.convert(bekInfo));
+    }
+    return bib.build();
   }
 
   /**
@@ -264,15 +292,22 @@ public final class OmBucketInfo extends WithMetadata implements Auditable {
    * @return instance of OmBucketInfo
    */
   public static OmBucketInfo getFromProtobuf(BucketInfo bucketInfo) {
-    return new OmBucketInfo(
-        bucketInfo.getVolumeName(),
-        bucketInfo.getBucketName(),
-        bucketInfo.getAclsList().stream().map(
-            OMPBHelper::convertOzoneAcl).collect(Collectors.toList()),
-        bucketInfo.getIsVersionEnabled(),
-        StorageType.valueOf(bucketInfo.getStorageType()),
-        bucketInfo.getCreationTime(),
-        KeyValueUtil.getFromProtobuf(bucketInfo.getMetadataList()));
+    OmBucketInfo.Builder obib = OmBucketInfo.newBuilder()
+        .setVolumeName(bucketInfo.getVolumeName())
+        .setBucketName(bucketInfo.getBucketName())
+        .setAcls(bucketInfo.getAclsList().stream().map(
+            OMPBHelper::convertOzoneAcl).collect(Collectors.toList()))
+        .setIsVersionEnabled(bucketInfo.getIsVersionEnabled())
+        .setStorageType(StorageType.valueOf(bucketInfo.getStorageType()))
+        .setCreationTime(bucketInfo.getCreationTime());
+    if (bucketInfo.getMetadataList() != null) {
+      obib.addAllMetadata(KeyValueUtil
+          .getFromProtobuf(bucketInfo.getMetadataList()));
+    }
+    if (bucketInfo.hasBeinfo()) {
+      obib.setBucketEncryptionKey(OMPBHelper.convert(bucketInfo.getBeinfo()));
+    }
+    return obib.build();
   }
 
   @Override
@@ -290,7 +325,8 @@ public final class OmBucketInfo extends WithMetadata implements Auditable {
         Objects.equals(acls, that.acls) &&
         Objects.equals(isVersionEnabled, that.isVersionEnabled) &&
         storageType == that.storageType &&
-        Objects.equals(metadata, that.metadata);
+        Objects.equals(metadata, that.metadata) &&
+        Objects.equals(bekInfo, that.bekInfo);
   }
 
   @Override
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
index 27a85d7..07f7909 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
@@ -25,8 +25,10 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
+import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo;
+import org.apache.hadoop.ozone.protocolPB.OMPBHelper;
 import org.apache.hadoop.util.Time;
 
 import com.google.common.base.Preconditions;
@@ -47,6 +49,7 @@ public final class OmKeyInfo extends WithMetadata {
   private long modificationTime;
   private HddsProtos.ReplicationType type;
   private HddsProtos.ReplicationFactor factor;
+  private FileEncryptionInfo encInfo;
 
   @SuppressWarnings("parameternumber")
   OmKeyInfo(String volumeName, String bucketName, String keyName,
@@ -54,7 +57,8 @@ public final class OmKeyInfo extends WithMetadata {
       long creationTime, long modificationTime,
       HddsProtos.ReplicationType type,
       HddsProtos.ReplicationFactor factor,
-      Map<String, String> metadata) {
+      Map<String, String> metadata,
+      FileEncryptionInfo encInfo) {
     this.volumeName = volumeName;
     this.bucketName = bucketName;
     this.keyName = keyName;
@@ -76,6 +80,7 @@ public final class OmKeyInfo extends WithMetadata {
     this.factor = factor;
     this.type = type;
     this.metadata = metadata;
+    this.encInfo = encInfo;
   }
 
   public String getVolumeName() {
@@ -207,6 +212,10 @@ public final class OmKeyInfo extends WithMetadata {
     this.modificationTime = modificationTime;
   }
 
+  public FileEncryptionInfo getFileEncryptionInfo() {
+    return encInfo;
+  }
+
   /**
    * Builder of OmKeyInfo.
    */
@@ -222,6 +231,7 @@ public final class OmKeyInfo extends WithMetadata {
     private HddsProtos.ReplicationType type;
     private HddsProtos.ReplicationFactor factor;
     private Map<String, String> metadata;
+    private FileEncryptionInfo encInfo;
 
     public Builder() {
       this.metadata = new HashMap<>();
@@ -284,17 +294,23 @@ public final class OmKeyInfo extends WithMetadata {
       return this;
     }
 
+    public Builder setFileEncryptionInfo(FileEncryptionInfo feInfo) {
+      this.encInfo = feInfo;
+      return this;
+    }
+
     public OmKeyInfo build() {
       return new OmKeyInfo(
           volumeName, bucketName, keyName, omKeyLocationInfoGroups,
-          dataSize, creationTime, modificationTime, type, factor, metadata);
+          dataSize, creationTime, modificationTime, type, factor, metadata,
+          encInfo);
     }
   }
 
   public KeyInfo getProtobuf() {
     long latestVersion = keyLocationVersions.size() == 0 ? -1 :
         keyLocationVersions.get(keyLocationVersions.size() - 1).getVersion();
-    return KeyInfo.newBuilder()
+    KeyInfo.Builder kb = KeyInfo.newBuilder()
         .setVolumeName(volumeName)
         .setBucketName(bucketName)
         .setKeyName(keyName)
@@ -307,8 +323,11 @@ public final class OmKeyInfo extends WithMetadata {
         .setLatestVersion(latestVersion)
         .setCreationTime(creationTime)
         .setModificationTime(modificationTime)
-        .addAllMetadata(KeyValueUtil.toProtobuf(metadata))
-        .build();
+        .addAllMetadata(KeyValueUtil.toProtobuf(metadata));
+    if (encInfo != null) {
+      kb.setFileEncryptionInfo(OMPBHelper.convert(encInfo));
+    }
+    return kb.build();
   }
 
   public static OmKeyInfo getFromProtobuf(KeyInfo keyInfo) {
@@ -324,7 +343,9 @@ public final class OmKeyInfo extends WithMetadata {
         keyInfo.getModificationTime(),
         keyInfo.getType(),
         keyInfo.getFactor(),
-        KeyValueUtil.getFromProtobuf(keyInfo.getMetadataList()));
+        KeyValueUtil.getFromProtobuf(keyInfo.getMetadataList()),
+        keyInfo.hasFileEncryptionInfo() ? OMPBHelper.convert(keyInfo
+            .getFileEncryptionInfo()): null);
   }
 
   @Override
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/protocolPB/OMPBHelper.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/protocolPB/OMPBHelper.java
index df069ce..fa6fc85 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/protocolPB/OMPBHelper.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/protocolPB/OMPBHelper.java
@@ -18,8 +18,22 @@
 package org.apache.hadoop.ozone.protocolPB;
 
 import com.google.protobuf.ByteString;
+import org.apache.hadoop.crypto.CipherSuite;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
+import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .BucketEncryptionInfoProto;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .CipherSuiteProto;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .CryptoProtocolVersionProto;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .FileEncryptionInfoProto;
+
 import org.apache.hadoop.ozone.protocol.proto
     .OzoneManagerProtocolProtos.OzoneAclInfo;
 import org.apache.hadoop.ozone.protocol.proto
@@ -148,4 +162,119 @@ public final class OMPBHelper {
         .toByteArray(), tokenProto.getPassword().toByteArray(), new Text(
         tokenProto.getKind()), new Text(tokenProto.getService()));
   }
+
+  public static BucketEncryptionKeyInfo convert(
+      BucketEncryptionInfoProto beInfo) {
+    if (beInfo == null) {
+      throw new IllegalArgumentException("Invalid argument: bucket encryption" +
+          " info is null");
+    }
+
+    return new BucketEncryptionKeyInfo(
+        beInfo.hasCryptoProtocolVersion()?
+            convert(beInfo.getCryptoProtocolVersion()) : null,
+        beInfo.hasSuite()? convert(beInfo.getSuite()) : null,
+        beInfo.getKeyName());
+  }
+
+
+  public static BucketEncryptionInfoProto convert(
+      BucketEncryptionKeyInfo beInfo) {
+    if (beInfo == null || beInfo.getKeyName() == null) {
+      throw new IllegalArgumentException("Invalid argument: bucket encryption" +
+          " info is null");
+    }
+
+    BucketEncryptionInfoProto.Builder bb = BucketEncryptionInfoProto
+        .newBuilder().setKeyName(beInfo.getKeyName());
+
+    if (beInfo.getSuite() != null) {
+      bb.setSuite(convert(beInfo.getSuite()));
+    }
+    if (beInfo.getVersion()!= null) {
+      bb.setCryptoProtocolVersion(convert(beInfo.getVersion()));
+    }
+    return bb.build();
+  }
+
+  public static FileEncryptionInfoProto convert(
+      FileEncryptionInfo info) {
+    if (info == null) {
+      return null;
+    }
+    return OzoneManagerProtocolProtos.FileEncryptionInfoProto.newBuilder()
+        .setSuite(convert(info.getCipherSuite()))
+        .setCryptoProtocolVersion(convert(info.getCryptoProtocolVersion()))
+        .setKey(getByteString(info.getEncryptedDataEncryptionKey()))
+        .setIv(getByteString(info.getIV()))
+        .setEzKeyVersionName(info.getEzKeyVersionName())
+        .setKeyName(info.getKeyName())
+        .build();
+  }
+
+  public static FileEncryptionInfo convert(FileEncryptionInfoProto proto) {
+    if (proto == null) {
+      return null;
+    }
+    CipherSuite suite = convert(proto.getSuite());
+    CryptoProtocolVersion version = convert(proto.getCryptoProtocolVersion());
+    byte[] key = proto.getKey().toByteArray();
+    byte[] iv = proto.getIv().toByteArray();
+    String ezKeyVersionName = proto.getEzKeyVersionName();
+    String keyName = proto.getKeyName();
+    return new FileEncryptionInfo(suite, version, key, iv, keyName,
+        ezKeyVersionName);
+  }
+
+  public static CipherSuite convert(CipherSuiteProto proto) {
+    switch(proto) {
+    case AES_CTR_NOPADDING:
+      return CipherSuite.AES_CTR_NOPADDING;
+    default:
+      // Set to UNKNOWN and stash the unknown enum value
+      CipherSuite suite = CipherSuite.UNKNOWN;
+      suite.setUnknownValue(proto.getNumber());
+      return suite;
+    }
+  }
+
+  public static CipherSuiteProto convert(CipherSuite suite) {
+    switch (suite) {
+    case UNKNOWN:
+      return CipherSuiteProto.UNKNOWN;
+    case AES_CTR_NOPADDING:
+      return CipherSuiteProto.AES_CTR_NOPADDING;
+    default:
+      return null;
+    }
+  }
+
+  public static CryptoProtocolVersionProto convert(
+      CryptoProtocolVersion version) {
+    switch(version) {
+    case UNKNOWN:
+      return OzoneManagerProtocolProtos.CryptoProtocolVersionProto
+          .UNKNOWN_PROTOCOL_VERSION;
+    case ENCRYPTION_ZONES:
+      return OzoneManagerProtocolProtos.CryptoProtocolVersionProto
+          .ENCRYPTION_ZONES;
+    default:
+      return null;
+    }
+  }
+
+  public static CryptoProtocolVersion convert(
+      CryptoProtocolVersionProto proto) {
+    switch(proto) {
+    case ENCRYPTION_ZONES:
+      return CryptoProtocolVersion.ENCRYPTION_ZONES;
+    default:
+      // Set to UNKNOWN and stash the unknown enum value
+      CryptoProtocolVersion version = CryptoProtocolVersion.UNKNOWN;
+      version.setUnknownValue(proto.getNumber());
+      return version;
+    }
+  }
+
+
 }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/web/response/BucketInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/web/response/BucketInfo.java
index 26798e9..b431f19 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/web/response/BucketInfo.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/web/response/BucketInfo.java
@@ -68,6 +68,7 @@ public class BucketInfo implements Comparable<BucketInfo> {
   private StorageType storageType;
   private long bytesUsed;
   private long keyCount;
+  private String encryptionKeyName;
 
   /**
    * Constructor for BucketInfo.
@@ -192,6 +193,15 @@ public class BucketInfo implements Comparable<BucketInfo> {
     return createdOn;
   }
 
+
+  public void setEncryptionKeyName(String encryptionKeyName) {
+    this.encryptionKeyName = encryptionKeyName;
+  }
+
+  public String getEncryptionKeyName() {
+    return encryptionKeyName;
+  }
+
   /**
    * Returns a JSON string of this object.
    * After stripping out bytesUsed and keyCount
diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
index 12ec29b..a557c3e 100644
--- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
+++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
@@ -223,6 +223,10 @@ enum Status {
     SCM_IN_CHILL_MODE = 38;
     INVALID_REQUEST = 39;
 
+    BUCKET_ENCRYPTION_KEY_NOT_FOUND = 40;
+    UNKNOWN_CIPHER_SUITE = 41;
+    INVALID_KMS_PROVIDER = 42;
+
 }
 
 
@@ -330,7 +334,7 @@ message BucketInfo {
     required StorageTypeProto storageType = 5 [default = DISK];
     required uint64 creationTime = 6;
     repeated hadoop.hdds.KeyValue metadata = 7;
-
+    optional BucketEncryptionInfoProto beinfo = 8;
 }
 
 enum StorageTypeProto {
@@ -340,6 +344,60 @@ enum StorageTypeProto {
     RAM_DISK = 4;
 }
 
+/**
+ * Cipher suite.
+ */
+enum CipherSuiteProto {
+    UNKNOWN = 1;
+    AES_CTR_NOPADDING = 2;
+}
+
+/**
+ * Crypto protocol version used to access encrypted files.
+ */
+enum CryptoProtocolVersionProto {
+    UNKNOWN_PROTOCOL_VERSION = 1;
+    ENCRYPTION_ZONES = 2;
+}
+/**
+ * Encryption information for bucket (bucket key)
+ */
+message BucketEncryptionInfoProto {
+    required string keyName = 1;
+    optional CipherSuiteProto suite = 2;
+    optional CryptoProtocolVersionProto cryptoProtocolVersion = 3;
+}
+
+/**
+ * Encryption information for a file.
+ */
+message FileEncryptionInfoProto {
+    required CipherSuiteProto suite = 1;
+    required CryptoProtocolVersionProto cryptoProtocolVersion = 2;
+    required bytes key = 3;
+    required bytes iv = 4;
+    required string keyName = 5;
+    required string ezKeyVersionName = 6;
+}
+
+/**
+ * Encryption information for an individual
+ * file within an encryption zone
+ */
+message PerFileEncryptionInfoProto {
+    required bytes key = 1;
+    required bytes iv = 2;
+    required string ezKeyVersionName = 3;
+}
+
+message DataEncryptionKeyProto {
+    required uint32 keyId = 1;
+    required bytes nonce = 3;
+    required bytes encryptionKey = 4;
+    required uint64 expiryDate = 5;
+    optional string encryptionAlgorithm = 6;
+}
+
 message BucketArgs {
     required string volumeName = 1;
     required string bucketName = 2;
@@ -439,6 +497,7 @@ message KeyLocation {
 message KeyLocationList {
     optional uint64 version = 1;
     repeated KeyLocation keyLocations = 2;
+    optional FileEncryptionInfoProto fileEncryptionInfo = 3;
 }
 
 message KeyInfo {
@@ -453,7 +512,7 @@ message KeyInfo {
     required uint64 modificationTime = 9;
     optional uint64 latestVersion = 10;
     repeated hadoop.hdds.KeyValue metadata = 11;
-
+    optional FileEncryptionInfoProto fileEncryptionInfo = 12;
 }
 
 message CreateKeyRequest {
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure/.env b/hadoop-ozone/dist/src/main/compose/ozonesecure/.env
index cac418a..d634dca 100644
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure/.env
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure/.env
@@ -15,3 +15,4 @@
 # limitations under the License.
 
 HDDS_VERSION=${hdds.version}
+HADOOP_VERSION=3
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-compose.yaml b/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-compose.yaml
index 842076a..6045706 100644
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-compose.yaml
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-compose.yaml
@@ -25,6 +25,15 @@ services:
     hostname: kdc
     volumes:
       - ../..:/opt/hadoop
+  
+  kms:
+      image: apache/hadoop:${HADOOP_VERSION}
+      ports:
+      - 9600:9600
+      env_file:
+      - ./docker-config
+      command: ["hadoop", "kms"]
+
   datanode:
     build:
       context: docker-image/runner
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config b/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
index 36f05ae..bf8eda9 100644
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
@@ -43,6 +43,8 @@ HDFS-SITE.XML_dfs.datanode.http.address=0.0.0.0:1012
 CORE-SITE.XML_dfs.data.transfer.protection=authentication
 CORE-SITE.XML_hadoop.security.authentication=kerberos
 CORE-SITE.XML_hadoop.security.auth_to_local=RULE:[2:$1@$0](.*)s/.*/root/
+CORE-SITE.XML_hadoop.security.key.provider.path=kms://http@kms:9600/kms
+
 HDFS-SITE.XML_rpc.metrics.quantile.enable=true
 HDFS-SITE.XML_rpc.metrics.percentiles.intervals=60,300
 LOG4J.PROPERTIES_log4j.rootLogger=INFO, stdout
diff --git a/hadoop-ozone/integration-test/pom.xml b/hadoop-ozone/integration-test/pom.xml
index 1867d95..04f131b 100644
--- a/hadoop-ozone/integration-test/pom.xml
+++ b/hadoop-ozone/integration-test/pom.xml
@@ -82,6 +82,16 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
       <artifactId>mockito-all</artifactId>
       <scope>test</scope>
     </dependency>
-
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-kms</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-kms</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
new file mode 100644
index 0000000..a52ab8c
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.rpc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.crypto.key.kms.KMSClientProvider;
+import org.apache.hadoop.crypto.key.kms.server.MiniKMS;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.client.BucketArgs;
+import org.apache.hadoop.ozone.client.CertificateClientTestImpl;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneKey;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.UUID;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
+
+/**
+ * This class is to test all the public facing APIs of Ozone Client.
+ */
+public class TestOzoneAtRestEncryption extends TestOzoneRpcClient {
+
+  private static MiniOzoneCluster cluster = null;
+  private static MiniKMS miniKMS;
+  private static OzoneClient ozClient = null;
+  private static ObjectStore store = null;
+  private static OzoneManager ozoneManager;
+  private static StorageContainerLocationProtocolClientSideTranslatorPB
+      storageContainerLocationClient;
+
+  private static final String SCM_ID = UUID.randomUUID().toString();
+  private static File testDir;
+  private static OzoneConfiguration conf;
+  private static final String TEST_KEY = "key1";
+
+
+    /**
+     * Create a MiniOzoneCluster for testing.
+     * <p>
+     * Ozone is made active by setting OZONE_ENABLED = true
+     *
+     * @throws IOException
+     */
+  @BeforeClass
+  public static void init() throws Exception {
+    testDir = GenericTestUtils.getTestDir(
+        TestSecureOzoneRpcClient.class.getSimpleName());
+
+    File kmsDir = new File(testDir, UUID.randomUUID().toString());
+    Assert.assertTrue(kmsDir.mkdirs());
+    MiniKMS.Builder miniKMSBuilder = new MiniKMS.Builder();
+    miniKMS = miniKMSBuilder.setKmsConfDir(kmsDir).build();
+    miniKMS.start();
+
+    OzoneManager.setTestSecureOmFlag(true);
+    conf = new OzoneConfiguration();
+    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
+        getKeyProviderURI(miniKMS));
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+    conf.setBoolean(HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED, true);
+    conf.set(OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+    CertificateClientTestImpl certificateClientTest =
+        new CertificateClientTestImpl(conf);
+    cluster = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(10)
+        .setScmId(SCM_ID)
+        .setCertificateClient(certificateClientTest)
+        .build();
+    cluster.getOzoneManager().startSecretManager();
+    cluster.waitForClusterToBeReady();
+    ozClient = OzoneClientFactory.getRpcClient(conf);
+    store = ozClient.getObjectStore();
+    storageContainerLocationClient =
+        cluster.getStorageContainerLocationClient();
+    ozoneManager = cluster.getOzoneManager();
+    TestOzoneRpcClient.setCluster(cluster);
+    TestOzoneRpcClient.setOzClient(ozClient);
+    TestOzoneRpcClient.setOzoneManager(ozoneManager);
+    TestOzoneRpcClient.setStorageContainerLocationClient(
+        storageContainerLocationClient);
+    TestOzoneRpcClient.setStore(store);
+    TestOzoneRpcClient.setScmId(SCM_ID);
+
+    // create test key
+    createKey(TEST_KEY, cluster.getOzoneManager().getKmsProvider(), conf);
+  }
+
+
+
+    /**
+     * Close OzoneClient and shutdown MiniOzoneCluster.
+     */
+  @AfterClass
+  public static void shutdown() throws IOException {
+    if(ozClient != null) {
+      ozClient.close();
+    }
+
+    if (storageContainerLocationClient != null) {
+      storageContainerLocationClient.close();
+    }
+
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+
+    if (miniKMS != null) {
+      miniKMS.stop();
+    }
+  }
+
+  @Test
+  public void testPutKeyWithEncryption() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    long currentTime = Time.now();
+
+    String value = "sample value";
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    BucketArgs bucketArgs = BucketArgs.newBuilder()
+        .setBucketEncryptionKey(TEST_KEY).build();
+    volume.createBucket(bucketName, bucketArgs);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    for (int i = 0; i < 1; i++) {
+      String keyName = UUID.randomUUID().toString();
+
+      try (OzoneOutputStream out = bucket.createKey(keyName,
+          value.getBytes("UTF-8").length, ReplicationType.STAND_ALONE,
+          ReplicationFactor.ONE, new HashMap<>())) {
+        out.write(value.getBytes("UTF-8"));
+      }
+
+      OzoneKey key = bucket.getKey(keyName);
+      Assert.assertEquals(keyName, key.getName());
+      byte[] fileContent;
+      int len = 0;
+
+      try(OzoneInputStream is = bucket.readKey(keyName)) {
+        fileContent = new byte[value.getBytes("UTF-8").length];
+        len = is.read(fileContent);
+      }
+
+      Assert.assertEquals(len, value.length());
+      Assert.assertTrue(verifyRatisReplication(volumeName, bucketName,
+          keyName, ReplicationType.STAND_ALONE,
+          ReplicationFactor.ONE));
+      Assert.assertEquals(value, new String(fileContent, "UTF-8"));
+      Assert.assertTrue(key.getCreationTime() >= currentTime);
+      Assert.assertTrue(key.getModificationTime() >= currentTime);
+    }
+  }
+
+  private boolean verifyRatisReplication(String volumeName, String bucketName,
+      String keyName, ReplicationType type, ReplicationFactor factor)
+      throws IOException {
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setKeyName(keyName)
+        .build();
+    HddsProtos.ReplicationType replicationType =
+        HddsProtos.ReplicationType.valueOf(type.toString());
+    HddsProtos.ReplicationFactor replicationFactor =
+        HddsProtos.ReplicationFactor.valueOf(factor.getValue());
+    OmKeyInfo keyInfo = ozoneManager.lookupKey(keyArgs);
+    for (OmKeyLocationInfo info:
+        keyInfo.getLatestVersionLocations().getLocationList()) {
+      ContainerInfo container =
+          storageContainerLocationClient.getContainer(info.getContainerID());
+      if (!container.getReplicationFactor().equals(replicationFactor) || (
+          container.getReplicationType() != replicationType)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private static String getKeyProviderURI(MiniKMS kms) {
+    return KMSClientProvider.SCHEME_NAME + "://" +
+        kms.getKMSUrl().toExternalForm().replace("://", "@");
+  }
+
+  private static void createKey(String keyName, KeyProvider
+      provider, Configuration config)
+      throws NoSuchAlgorithmException, IOException {
+    final KeyProvider.Options options = KeyProvider.options(config);
+    options.setDescription(keyName);
+    options.setBitLength(128);
+    provider.createKey(keyName, options);
+    provider.flush();
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java
index c936354..b15e692 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java
@@ -19,9 +19,15 @@ package org.apache.hadoop.ozone.om;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.hadoop.crypto.CipherSuite;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
+import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.hdds.protocol.StorageType;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.util.Time;
@@ -42,6 +48,7 @@ public class BucketManagerImpl implements BucketManager {
    * OMMetadataManager is used for accessing OM MetadataDB and ReadWriteLock.
    */
   private final OMMetadataManager metadataManager;
+  private final KeyProviderCryptoExtension kmsProvider;
 
   /**
    * Constructs BucketManager.
@@ -49,7 +56,17 @@ public class BucketManagerImpl implements BucketManager {
    * @param metadataManager
    */
   public BucketManagerImpl(OMMetadataManager metadataManager) {
+    this(metadataManager, null);
+  }
+
+  public BucketManagerImpl(OMMetadataManager metadataManager,
+                           KeyProviderCryptoExtension kmsProvider) {
     this.metadataManager = metadataManager;
+    this.kmsProvider = kmsProvider;
+  }
+
+  KeyProviderCryptoExtension getKMSProvider() {
+    return kmsProvider;
   }
 
   /**
@@ -99,19 +116,47 @@ public class BucketManagerImpl implements BucketManager {
         throw new OMException("Bucket already exist",
             OMException.ResultCodes.BUCKET_ALREADY_EXISTS);
       }
-
-      OmBucketInfo omBucketInfo = OmBucketInfo.newBuilder()
+      BucketEncryptionKeyInfo bek = bucketInfo.getEncryptionKeyInfo();
+      BucketEncryptionKeyInfo.Builder bekb = null;
+      if (bek != null) {
+        if (kmsProvider == null) {
+          throw new OMException("Invalid KMS provider, check configuration " +
+              CommonConfigurationKeys.HADOOP_SECURITY_KEY_PROVIDER_PATH,
+              OMException.ResultCodes.INVALID_KMS_PROVIDER);
+        }
+        if (bek.getKeyName() == null) {
+          throw new OMException("Bucket encryption key needed.", OMException
+              .ResultCodes.BUCKET_ENCRYPTION_KEY_NOT_FOUND);
+        }
+        // Talk to KMS to retrieve the bucket encryption key info.
+        KeyProvider.Metadata metadata = getKMSProvider().getMetadata(
+            bek.getKeyName());
+        if (metadata == null) {
+          throw new OMException("Bucket encryption key " + bek.getKeyName()
+              + " doesn't exist.",
+              OMException.ResultCodes.BUCKET_ENCRYPTION_KEY_NOT_FOUND);
+        }
+        // If the provider supports pool for EDEKs, this will fill in the pool
+        kmsProvider.warmUpEncryptedKeys(bek.getKeyName());
+        bekb = new BucketEncryptionKeyInfo.Builder()
+            .setKeyName(bek.getKeyName())
+            .setVersion(CryptoProtocolVersion.ENCRYPTION_ZONES)
+            .setSuite(CipherSuite.convert(metadata.getCipher()));
+      }
+      OmBucketInfo.Builder omBucketInfoBuilder = OmBucketInfo.newBuilder()
           .setVolumeName(bucketInfo.getVolumeName())
           .setBucketName(bucketInfo.getBucketName())
           .setAcls(bucketInfo.getAcls())
           .setStorageType(bucketInfo.getStorageType())
           .setIsVersionEnabled(bucketInfo.getIsVersionEnabled())
           .setCreationTime(Time.now())
-          .addAllMetadata(bucketInfo.getMetadata())
-          .build();
-      metadataManager.getBucketTable().put(bucketKey,
-          omBucketInfo);
+          .addAllMetadata(bucketInfo.getMetadata());
 
+      if (bekb != null) {
+        omBucketInfoBuilder.setBucketEncryptionKey(bekb.build());
+      }
+      metadataManager.getBucketTable().put(bucketKey,
+          omBucketInfoBuilder.build());
       LOG.debug("created bucket: {} in volume: {}", bucketName, volumeName);
     } catch (IOException | DBException ex) {
       if (!(ex instanceof OMException)) {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index c9ac3f0..c6ee7a7 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -20,16 +20,22 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.HashMap;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.security.GeneralSecurityException;
+import java.security.PrivilegedExceptionAction;
 
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -41,11 +47,8 @@ import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto;
-import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.ozone.common.BlockGroup;
-import org.apache.hadoop.ozone.om.exceptions.OMException;
-import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
+import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
@@ -58,6 +61,12 @@ import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
 import org.apache.hadoop.ozone.om.helpers.OmPartInfo;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .PartKeyInfo;
 import org.apache.hadoop.util.Time;
@@ -80,6 +89,7 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MA
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_MULTIPART_MIN_SIZE;
+import static org.apache.hadoop.util.Time.monotonicNow;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -106,9 +116,18 @@ public class KeyManagerImpl implements KeyManager {
 
   private BackgroundService keyDeletingService;
 
+  private final KeyProviderCryptoExtension kmsProvider;
+
   public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
       OMMetadataManager metadataManager, OzoneConfiguration conf, String omId,
       OzoneBlockTokenSecretManager secretManager) {
+    this(scmBlockClient, metadataManager, conf, omId, secretManager, null);
+  }
+
+  public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
+      OMMetadataManager metadataManager, OzoneConfiguration conf, String omId,
+      OzoneBlockTokenSecretManager secretManager,
+      KeyProviderCryptoExtension kmsProvider) {
     this.scmBlockClient = scmBlockClient;
     this.metadataManager = metadataManager;
     this.scmBlockSize = (long) conf
@@ -125,6 +144,7 @@ public class KeyManagerImpl implements KeyManager {
     this.grpcBlockTokenEnabled = conf.getBoolean(
         HDDS_BLOCK_TOKEN_ENABLED,
         HDDS_BLOCK_TOKEN_ENABLED_DEFAULT);
+    this.kmsProvider = kmsProvider;
   }
 
   @Override
@@ -144,6 +164,10 @@ public class KeyManagerImpl implements KeyManager {
     }
   }
 
+  KeyProviderCryptoExtension getKMSProvider() {
+    return kmsProvider;
+  }
+
   @Override
   public void stop() throws IOException {
     if (keyDeletingService != null) {
@@ -152,6 +176,12 @@ public class KeyManagerImpl implements KeyManager {
     }
   }
 
+  private OmBucketInfo getBucketInfo(String volumeName, String bucketName)
+      throws IOException {
+    String bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
+    return metadataManager.getBucketTable().get(bucketKey);
+  }
+
   private void validateBucket(String volumeName, String bucketName)
       throws IOException {
     String volumeKey = metadataManager.getVolumeKey(volumeName);
@@ -259,6 +289,30 @@ public class KeyManagerImpl implements KeyManager {
     return EnumSet.allOf(AccessModeProto.class);
   }
 
+  private EncryptedKeyVersion generateEDEK(
+      final String ezKeyName) throws IOException {
+    if (ezKeyName == null) {
+      return null;
+    }
+    long generateEDEKStartTime = monotonicNow();
+    EncryptedKeyVersion edek = SecurityUtil.doAsLoginUser(
+        new PrivilegedExceptionAction<EncryptedKeyVersion>() {
+          @Override
+          public EncryptedKeyVersion run() throws IOException {
+            try {
+              return getKMSProvider().generateEncryptedKey(ezKeyName);
+            } catch (GeneralSecurityException e) {
+              throw new IOException(e);
+            }
+          }
+        });
+    long generateEDEKTime = monotonicNow() - generateEDEKStartTime;
+    LOG.debug("generateEDEK takes {} ms", generateEDEKTime);
+    Preconditions.checkNotNull(edek);
+    return edek;
+  }
+
+  @SuppressWarnings("checkstyle:methodlength")
   @Override
   public OpenKeySession openKey(OmKeyArgs args) throws IOException {
     Preconditions.checkNotNull(args);
@@ -272,6 +326,24 @@ public class KeyManagerImpl implements KeyManager {
     ReplicationType type = args.getType();
     long currentTime = Time.monotonicNowNanos();
 
+    FileEncryptionInfo encInfo = null;
+    OmBucketInfo bucketInfo = getBucketInfo(volumeName, bucketName);
+    BucketEncryptionKeyInfo ezInfo = bucketInfo.getEncryptionKeyInfo();
+    if (ezInfo != null) {
+      if (getKMSProvider() == null) {
+        throw new OMException("Invalid KMS provider, check configuration " +
+            CommonConfigurationKeys.HADOOP_SECURITY_KEY_PROVIDER_PATH,
+            OMException.ResultCodes.INVALID_KMS_PROVIDER);
+      }
+
+      final String ezKeyName = ezInfo.getKeyName();
+      EncryptedKeyVersion edek = generateEDEK(ezKeyName);
+      encInfo = new FileEncryptionInfo(ezInfo.getSuite(), ezInfo.getVersion(),
+            edek.getEncryptedKeyVersion().getMaterial(),
+            edek.getEncryptedKeyIv(),
+            ezKeyName, edek.getEncryptionKeyVersionName());
+    }
+
     try {
       if (args.getIsMultipartKey()) {
         Preconditions.checkArgument(args.getMultipartUploadPartNumber() > 0,
@@ -356,7 +428,7 @@ public class KeyManagerImpl implements KeyManager {
       if (args.getIsMultipartKey()) {
         // For this upload part we don't need to check in KeyTable. As this
         // is not an actual key, it is a part of the key.
-        keyInfo = createKeyInfo(args, locations, factor, type, size);
+        keyInfo = createKeyInfo(args, locations, factor, type, size, encInfo);
         //TODO args.getMetadata
         openVersion = 0;
       } else {
@@ -370,7 +442,7 @@ public class KeyManagerImpl implements KeyManager {
         } else {
           // the key does not exist, create a new object, the new blocks are the
           // version 0
-          keyInfo = createKeyInfo(args, locations, factor, type, size);
+          keyInfo = createKeyInfo(args, locations, factor, type, size, encInfo);
           openVersion = 0;
         }
       }
@@ -412,12 +484,14 @@ public class KeyManagerImpl implements KeyManager {
    * @param factor
    * @param type
    * @param size
+   * @param encInfo
    * @return
    */
   private OmKeyInfo createKeyInfo(OmKeyArgs keyArgs,
                                   List<OmKeyLocationInfo> locations,
                                   ReplicationFactor factor,
-                                  ReplicationType type, long size) {
+                                  ReplicationType type, long size,
+                                  FileEncryptionInfo encInfo) {
     return new OmKeyInfo.Builder()
         .setVolumeName(keyArgs.getVolumeName())
         .setBucketName(keyArgs.getBucketName())
@@ -429,6 +503,7 @@ public class KeyManagerImpl implements KeyManager {
         .setDataSize(size)
         .setReplicationType(type)
         .setReplicationFactor(factor)
+        .setFileEncryptionInfo(encInfo)
         .build();
   }
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 0c1a52f..3104de3 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -28,6 +28,9 @@ import java.security.KeyPair;
 import java.util.Collection;
 import java.util.Objects;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.HddsUtils;
@@ -114,6 +117,7 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.JvmPauseMonitor;
+import org.apache.hadoop.util.KMSUtil;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.StringUtils;
@@ -240,6 +244,10 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   private final S3SecretManager s3SecretManager;
   private volatile boolean isOmRpcServerRunning = false;
 
+  private KeyProviderCryptoExtension kmsProvider = null;
+  private static String keyProviderUriKeyName =
+      CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH;
+
   private OzoneManager(OzoneConfiguration conf) throws IOException,
       AuthenticationException {
     super(OzoneVersionInfo.OZONE_VERSION_INFO);
@@ -293,13 +301,22 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
         OZONE_OM_ADDRESS_KEY, omNodeRpcAddr, omRpcServer);
     metadataManager = new OmMetadataManagerImpl(configuration);
     volumeManager = new VolumeManagerImpl(metadataManager, configuration);
-    bucketManager = new BucketManagerImpl(metadataManager);
+
+    // Create the KMS Key Provider
+    try {
+      kmsProvider = createKeyProviderExt(configuration);
+    } catch (IOException ioe) {
+      kmsProvider = null;
+      LOG.error("Fail to create Key Provider");
+    }
+
+    bucketManager = new BucketManagerImpl(metadataManager, getKmsProvider());
     metrics = OMMetrics.create();
 
     s3BucketManager = new S3BucketManagerImpl(configuration, metadataManager,
         volumeManager, bucketManager);
     keyManager = new KeyManagerImpl(scmBlockClient, metadataManager,
-        configuration, omStorage.getOmId(), blockTokenMgr);
+        configuration, omStorage.getOmId(), blockTokenMgr, getKmsProvider());
     s3SecretManager = new S3SecretManagerImpl(configuration, metadataManager);
 
     shutdownHook = () -> {
@@ -468,6 +485,18 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
         NetUtils.getHostPortString(rpcAddress));
   }
 
+  private KeyProviderCryptoExtension createKeyProviderExt(
+      OzoneConfiguration conf) throws IOException {
+    KeyProvider keyProvider = KMSUtil.createKeyProvider(conf,
+        keyProviderUriKeyName);
+    if (keyProvider == null) {
+      return null;
+    }
+    KeyProviderCryptoExtension cryptoProvider = KeyProviderCryptoExtension
+        .createKeyProviderCryptoExtension(keyProvider);
+    return cryptoProvider;
+  }
+
   /**
    * Returns an instance of {@link IAccessAuthorizer}.
    * Looks up the configuration to see if there is custom class specified.
@@ -979,6 +1008,10 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     }
   }
 
+  @VisibleForTesting
+  public KeyProviderCryptoExtension getKmsProvider() {
+    return kmsProvider;
+  }
   /**
    * Get metadata manager.
    *
@@ -1003,7 +1036,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     DefaultMetricsSystem.initialize("OzoneManager");
 
     metadataManager.start(configuration);
-    startSecretManagerIfNecessary();
+    // TODO: uncomment this with HDDS-134 to avoid NPE
+    //startSecretManagerIfNecessary();
 
     // Set metrics and start metrics back ground thread
     metrics.setNumVolumes(metadataManager.countRowsInTable(metadataManager
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/web/ozShell/bucket/CreateBucketHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/web/ozShell/bucket/CreateBucketHandler.java
index 88b5176..f9c3591 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/web/ozShell/bucket/CreateBucketHandler.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/web/ozShell/bucket/CreateBucketHandler.java
@@ -17,18 +17,16 @@
  */
 package org.apache.hadoop.ozone.web.ozShell.bucket;
 
-import org.apache.hadoop.ozone.client.OzoneBucket;
-import org.apache.hadoop.ozone.client.OzoneClient;
-import org.apache.hadoop.ozone.client.OzoneClientUtils;
-import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.hdds.protocol.StorageType;
+import org.apache.hadoop.ozone.client.*;
 import org.apache.hadoop.ozone.web.ozShell.Handler;
 import org.apache.hadoop.ozone.web.ozShell.OzoneAddress;
 import org.apache.hadoop.ozone.web.ozShell.Shell;
 import org.apache.hadoop.ozone.web.utils.JsonUtils;
 
 import picocli.CommandLine.Command;
+import picocli.CommandLine.Option;
 import picocli.CommandLine.Parameters;
-
 /**
  * create bucket handler.
  */
@@ -39,6 +37,10 @@ public class CreateBucketHandler extends Handler {
   @Parameters(arity = "1..1", description = Shell.OZONE_BUCKET_URI_DESCRIPTION)
   private String uri;
 
+  @Option(names = {"--bucketkey", "-k"},
+      description = "bucket encryption key name")
+  private String bekName;
+
   /**
    * Executes create bucket.
    */
@@ -52,13 +54,31 @@ public class CreateBucketHandler extends Handler {
     String volumeName = address.getVolumeName();
     String bucketName = address.getBucketName();
 
+    BucketArgs.Builder bb = new BucketArgs.Builder()
+        .setStorageType(StorageType.DEFAULT)
+        .setVersioning(false);
+
+    if (bekName != null) {
+      if (!bekName.isEmpty()) {
+        bb.setBucketEncryptionKey(bekName);
+      } else {
+        throw new IllegalArgumentException("Bucket encryption key name must " +
+            "be specified to enable bucket encryption!");
+      }
+    }
+
     if (isVerbose()) {
       System.out.printf("Volume Name : %s%n", volumeName);
       System.out.printf("Bucket Name : %s%n", bucketName);
+      if (bekName != null) {
+        bb.setBucketEncryptionKey(bekName);
+        System.out.printf("Bucket Encryption enabled with Key Name: %s%n",
+            bekName);
+      }
     }
 
     OzoneVolume vol = client.getObjectStore().getVolume(volumeName);
-    vol.createBucket(bucketName);
+    vol.createBucket(bucketName, bb.build());
 
     if (isVerbose()) {
       OzoneBucket bucket = vol.getBucket(bucketName);
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestBucketManagerImpl.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestBucketManagerImpl.java
index 248c2d1..7fa72fd 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestBucketManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestBucketManagerImpl.java
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
 
+import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.StorageType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
@@ -29,10 +31,7 @@ import org.apache.hadoop.hdds.server.ServerUtils;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
-import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
-import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
-import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.*;
 
 import org.junit.Assert;
 import org.junit.Rule;
@@ -40,6 +39,7 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
+import org.mockito.Mockito;
 import org.mockito.runners.MockitoJUnitRunner;
 
 /**
@@ -105,6 +105,40 @@ public class TestBucketManagerImpl {
   public void testCreateBucket() throws Exception {
     OmMetadataManagerImpl metaMgr = createSampleVol();
 
+    KeyProviderCryptoExtension kmsProvider = Mockito.mock(
+        KeyProviderCryptoExtension.class);
+    String testBekName = "key1";
+    String testCipherName = "AES/CTR/NoPadding";
+
+    KeyProvider.Metadata mockMetadata = Mockito.mock(KeyProvider.Metadata
+        .class);
+    Mockito.when(kmsProvider.getMetadata(testBekName)).thenReturn(mockMetadata);
+    Mockito.when(mockMetadata.getCipher()).thenReturn(testCipherName);
+
+    BucketManager bucketManager = new BucketManagerImpl(metaMgr,
+        kmsProvider);
+    OmBucketInfo bucketInfo = OmBucketInfo.newBuilder()
+        .setVolumeName("sampleVol")
+        .setBucketName("bucketOne")
+        .setBucketEncryptionKey(new
+            BucketEncryptionKeyInfo.Builder().setKeyName("key1").build())
+        .build();
+    bucketManager.createBucket(bucketInfo);
+    Assert.assertNotNull(bucketManager.getBucketInfo("sampleVol", "bucketOne"));
+
+    OmBucketInfo bucketInfoRead =
+        bucketManager.getBucketInfo("sampleVol",  "bucketOne");
+
+    Assert.assertTrue(bucketInfoRead.getEncryptionKeyInfo().getKeyName()
+        .equals(bucketInfo.getEncryptionKeyInfo().getKeyName()));
+    metaMgr.getStore().close();
+  }
+
+
+  @Test
+  public void testCreateEncryptedBucket() throws Exception {
+    OmMetadataManagerImpl metaMgr = createSampleVol();
+
     BucketManager bucketManager = new BucketManagerImpl(metaMgr);
     OmBucketInfo bucketInfo = OmBucketInfo.newBuilder()
         .setVolumeName("sampleVol")
diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
index 2df3b56..399504c 100644
--- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
+++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
@@ -108,7 +108,7 @@ public class OzoneBucketStub extends OzoneBucket {
                 size,
                 System.currentTimeMillis(),
                 System.currentTimeMillis(),
-                new ArrayList<>(), type, metadata
+                new ArrayList<>(), type, metadata, null
             ));
             super.close();
           }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org