You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by me...@apache.org on 2019/09/29 08:40:26 UTC

[ranger] branch master updated: RANGER-2497 : Support Azure Key Vault for storing master keys of Ranger KMS

This is an automated email from the ASF dual-hosted git repository.

mehul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ranger.git


The following commit(s) were added to refs/heads/master by this push:
     new 02640d3  RANGER-2497 : Support Azure Key Vault for storing master keys of Ranger KMS
02640d3 is described below

commit 02640d3cf520ee5a0dae13dd46b91ff266359e77
Author: Dhaval Shah <dh...@gmail.com>
AuthorDate: Mon Aug 12 14:40:14 2019 +0530

    RANGER-2497 : Support Azure Key Vault for storing master keys of Ranger KMS
    
    Signed-off-by: Mehul Parikh <me...@apache.org>
---
 LICENSE.txt                                        |    3 +-
 NOTICE.txt                                         |    1 +
 kms/config/kms-webapp/dbks-site.xml                |   57 +
 kms/pom.xml                                        |   50 +
 kms/scripts/DBMKTOAZUREKEYVAULT.sh                 |   25 +
 kms/scripts/install.properties                     |   16 +
 kms/scripts/setup.sh                               |   92 ++
 .../key/AzureKeyVaultClientAuthenticator.java      |  260 +++++
 .../hadoop/crypto/key/DBToAzureKeyVault.java       |  220 ++++
 .../apache/hadoop/crypto/key/JKS2RangerUtil.java   |  272 +++--
 .../apache/hadoop/crypto/key/Ranger2JKSUtil.java   |  253 +++--
 .../apache/hadoop/crypto/key/RangerKeyStore.java   |  788 ++++++++++----
 .../hadoop/crypto/key/RangerKeyStoreProvider.java  | 1111 ++++++++++++--------
 .../crypto/key/RangerKeyVaultKeyGenerator.java     |  189 ++++
 pom.xml                                            |   13 +
 src/main/assembly/kms.xml                          |   56 +-
 16 files changed, 2547 insertions(+), 859 deletions(-)

diff --git a/LICENSE.txt b/LICENSE.txt
index a424ebe..12a4ead 100644
--- a/LICENSE.txt
+++ b/LICENSE.txt
@@ -217,7 +217,8 @@ The Apache Ranger project bundles the following files under the MIT License:
 This product includes jQuery (http://jquery.org ), Copyright © 2014, John Resig.
 This product includes jQuery UI (http://jqueryui.com ), Copyright © 2013 jQuery Foundation.  
 This product includes Backbone (http://backbonejs.org ), Copyright © 2010-2014 Jeremy Ashkenas, DocumentCloud.
-This product includes underscore (http:underscorejs.org ), Copyright © 2009-2014 Jeremy Ashkenas, DocumentCloud and Investigative Reporters & Editors.  
+This product includes underscore (http:underscorejs.org ), Copyright © 2009-2014 Jeremy Ashkenas, DocumentCloud and Investigative Reporters & Editors.
+This product includes Azure/azure-sdk-for-java (https://github.com/Azure/azure-sdk-for-java/blob/master/LICENSE.txt ), Copyright (c) 2015 Microsoft.  
 This product includes Backbone.Marionette (http://marionettejs.com/ ) Copyright (c)2013 Derick Bailey, Muted Solutions, LLC.  
 This product includes Backbone.Wreqr (http://marionettejs.com/ ) Copyright ©,2012 Derick Bailey, Muted Solutions, LLC.
 This product includes Backbone.BabySitter (http://marionettejs.com/ ), Copyright ©2013 Derick Bailey, Muted Solutions, LLC.  
diff --git a/NOTICE.txt b/NOTICE.txt
index a82c1f0..b315087 100644
--- a/NOTICE.txt
+++ b/NOTICE.txt
@@ -3,3 +3,4 @@ Copyright 2014-2019 The Apache Software Foundation
 
 This product includes software developed at The Apache Software Foundation (http://www.apache.org/).
 This product includes software developed by Spring Security Project (http://www.springframework.org/security)
+This product includes Azure/azure-sdk-for-java (https://github.com/Azure/azure-sdk-for-java/blob/master/LICENSE.txt ), Copyright (c) 2015 Microsoft.
diff --git a/kms/config/kms-webapp/dbks-site.xml b/kms/config/kms-webapp/dbks-site.xml
index 05a1a13..e9cafbc 100755
--- a/kms/config/kms-webapp/dbks-site.xml
+++ b/kms/config/kms-webapp/dbks-site.xml
@@ -231,6 +231,63 @@
   </property>
 
   <!-- Key-Secure Config END-->
+   <!--Azure Key Vault START-->
+   <property>
+        <name>ranger.kms.azurekeyvault.enabled</name>
+        <value>false</value>
+        <description>Flag for Azure Key Vault</description>
+  </property>
+  <property>
+        <name>ranger.kms.azure.keyvault.ssl.enabled</name>
+        <value>false</value>
+        <description>Flag for Azure authentication via certificate or password</description>
+  </property>
+  <property>
+        <name>ranger.kms.azure.client.id</name>
+        <value></value>
+        <description>Azure Client Id</description>
+  </property>
+  <property>
+        <name>ranger.kms.azure.client.secret</name>
+        <value></value>
+        <description>Azure Client Secret</description>
+  </property>
+    <property>
+        <name>ranger.kms.azure.client.secret.alias</name>
+        <value>ranger.ks.azure.client.secret</value>
+        <description>Azure Client Secret Alias</description>
+  </property>
+  <property>
+        <name>ranger.kms.azure.keyvault.certificate.path</name>
+        <value>/home/machine/Desktop/azureAuthCertificate/keyvault-MyCert.pfx</value>
+        <description>Azure key vault cerificate path</description>
+  </property>
+  <property>
+        <name>ranger.kms.azure.keyvault.certificate.password</name>
+        <value></value>
+        <description>Azure key vault cerificate password</description>
+  </property>
+   <property>
+        <name>ranger.kms.azure.masterkey.name</name>
+        <value></value>
+        <description>Azure master key name</description>
+  </property>
+  <property>
+        <name>ranger.kms.azure.masterkey.type</name>
+        <value></value>
+        <description>Azure key type: RSA, RSA_HSM, EC, EC_HSM</description>
+  </property>
+    <property>
+        <name>ranger.kms.azure.zonekey.encryption.algorithm</name>
+        <value></value>
+        <description>Encryption Algo : RSA_OAEP, RSA_OAEP_256, RSA1_5</description>
+  </property>
+   <property>
+        <name>ranger.kms.azurekeyvault.url</name>
+        <value></value>
+        <description>Azure Key Vault url</description>
+  </property>
+   <!--Azure Key Vault END-->
     
   <!-- HSM Config -->
   <property>
diff --git a/kms/pom.xml b/kms/pom.xml
index 34b383c..12c0002 100644
--- a/kms/pom.xml
+++ b/kms/pom.xml
@@ -272,6 +272,56 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+		    <groupId>com.microsoft.azure</groupId>
+		    <artifactId>azure</artifactId>
+		    <version>${com.microsoft.azure.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>com.microsoft.azure</groupId>
+			<artifactId>azure-keyvault</artifactId>
+			<version>${com.microsoft.azure.azure-keyvault.version}</version>
+		</dependency>
+		<dependency>
+		    <groupId>com.microsoft.azure</groupId>
+		    <artifactId>azure-mgmt-keyvault</artifactId>
+		    <version>${com.microsoft.azure.azure-mgmt-keyvault.version}</version>
+		</dependency>
+		<dependency>
+		    <groupId>com.microsoft.rest</groupId>
+		    <artifactId>client-runtime</artifactId>
+		    <version>${com.microsoft.rest.client-runtime.version}</version>
+		</dependency>
+		<dependency>
+		    <groupId>com.microsoft.azure</groupId>
+		    <artifactId>azure-client-runtime</artifactId>
+		    <version>${com.microsoft.azure.azure-client-runtime.version}</version>
+		</dependency>
+		<dependency>
+		    <groupId>com.microsoft.azure</groupId>
+   			<artifactId>adal4j</artifactId>
+    		<version>${com.microsoft.azure.adal4j.version}</version>
+		</dependency>
+		<dependency>
+		    <groupId>io.reactivex</groupId>
+		    <artifactId>rxjava</artifactId>
+		    <version>${io.reactivex.rxjava.version}</version>
+		</dependency>
+		<dependency>
+		    <groupId>net.minidev</groupId>
+		    <artifactId>asm</artifactId>
+		    <version>${net.minidev.asm.version}</version>
+		</dependency>
+        <dependency>
+            <groupId>org.bouncycastle</groupId>
+            <artifactId>bcprov-jdk15on</artifactId>
+            <version>${org.bouncycastle.bcprov-jdk15on}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.bouncycastle</groupId>
+            <artifactId>bcpkix-jdk15on</artifactId>
+            <version>${org.bouncycastle.bcpkix-jdk15on}</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.directory.api</groupId>
             <artifactId>api-i18n</artifactId>
             <version>1.0.0-M20</version>
diff --git a/kms/scripts/DBMKTOAZUREKEYVAULT.sh b/kms/scripts/DBMKTOAZUREKEYVAULT.sh
new file mode 100644
index 0000000..cfe5a6b
--- /dev/null
+++ b/kms/scripts/DBMKTOAZUREKEYVAULT.sh
@@ -0,0 +1,25 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# -------------------------------------------------------------------------------------
+if [ "$JAVA_HOME" != "" ]; then
+    export PATH=$JAVA_HOME/bin:$PATH
+else
+    exit ;
+fi
+
+RANGER_KMS_HOME=`dirname $0`
+cp="${RANGER_KMS_HOME}/cred/lib/*:${RANGER_KMS_HOME}/./ews/webapp/WEB-INF/classes/conf/:${RANGER_KMS_HOME}/ews/webapp/config:${RANGER_KMS_HOME}/ews/lib/*:${RANGER_KMS_HOME}/ews/webapp/lib/*:${RANGER_KMS_HOME}/ews/webapp/META-INF"
+java -cp "${cp}" org.apache.hadoop.crypto.key.DBToAzureKeyVault ${1} ${2} ${3} ${4} ${5} ${6} ${7} ${8} ${9}
diff --git a/kms/scripts/install.properties b/kms/scripts/install.properties
index 798dd8c..a30b1d3 100755
--- a/kms/scripts/install.properties
+++ b/kms/scripts/install.properties
@@ -99,6 +99,22 @@ KEYSECURE_HOSTNAME=SunPKCS11-keysecurehn
 KEYSECURE_MASTER_KEY_SIZE=256
 KEYSECURE_LIB_CONFIG_PATH=/opt/safenetConf/64/8.3.1/sunpkcs11.cfg
 
+#------------------------- Ranger Azure Key Vault ------------------------------
+AZURE_KEYVAULT_ENABLED=false
+AZURE_KEYVAULT_SSL_ENABLED=false
+AZURE_CLIENT_ID=50fd7ca6-fd4f-4785-a13f-1a6cc4e95e42
+AZURE_CLIENT_SECRET=<AzureKeyVaultPassword>
+AZURE_AUTH_KEYVAULT_CERTIFICATE_PATH=/home/machine/Desktop/azureAuthCertificate/keyvault-MyCert.pfx
+# Initialize below prop if your certificate file has any password
+#AZURE_AUTH_KEYVAULT_CERTIFICATE_PASSWORD=certPass
+AZURE_MASTERKEY_NAME=RangerMasterKey
+# E.G. RSA, RSA_HSM, EC, EC_HSM, OCT
+AZURE_MASTER_KEY_TYPE=RSA
+# E.G. RSA_OAEP, RSA_OAEP_256, RSA1_5, RSA_OAEP 
+ZONE_KEY_ENCRYPTION_ALGO=RSA_OAEP
+AZURE_KEYVAULT_URL=https://shahkeyvault.vault.azure.net/
+
+
 #
 # ------- UNIX User CONFIG ----------------
 #
diff --git a/kms/scripts/setup.sh b/kms/scripts/setup.sh
index 68f1f3f..9b4df38 100755
--- a/kms/scripts/setup.sh
+++ b/kms/scripts/setup.sh
@@ -100,6 +100,17 @@ KEYSECURE_HOSTNAME=$(get_prop 'KEYSECURE_HOSTNAME' $PROPFILE)
 KEYSECURE_MASTER_KEY_SIZE=$(get_prop 'KEYSECURE_MASTER_KEY_SIZE' $PROPFILE)
 KEYSECURE_LIB_CONFIG_PATH=$(get_prop 'KEYSECURE_LIB_CONFIG_PATH' $PROPFILE)
 
+AZURE_KEYVAULT_ENABLED=$(get_prop 'AZURE_KEYVAULT_ENABLED' $PROPFILE)
+AZURE_KEYVAULT_SSL_ENABLED=$(get_prop 'AZURE_KEYVAULT_SSL_ENABLED' $PROPFILE)
+AZURE_CLIENT_ID=$(get_prop 'AZURE_CLIENT_ID' $PROPFILE)
+AZURE_CLIENT_SECRET=$(get_prop 'AZURE_CLIENT_SECRET' $PROPFILE)
+AZURE_AUTH_KEYVAULT_CERTIFICATE_PATH=$(get_prop 'AZURE_AUTH_KEYVAULT_CERTIFICATE_PATH' $PROPFILE)
+AZURE_AUTH_KEYVAULT_CERTIFICATE_PASSWORD=$(get_prop 'AZURE_AUTH_KEYVAULT_CERTIFICATE_PASSWORD' $PROPFILE)
+AZURE_MASTERKEY_NAME=$(get_prop 'AZURE_MASTERKEY_NAME' $PROPFILE)
+AZURE_MASTER_KEY_TYPE=$(get_prop 'AZURE_MASTER_KEY_TYPE' $PROPFILE)
+ZONE_KEY_ENCRYPTION_ALGO=$(get_prop 'ZONE_KEY_ENCRYPTION_ALGO' $PROPFILE)
+AZURE_KEYVAULT_URL=$(get_prop 'AZURE_KEYVAULT_URL' $PROPFILE)
+
 kms_principal=$(get_prop 'kms_principal' $PROPFILE)
 kms_keytab=$(get_prop 'kms_keytab' $PROPFILE)
 hadoop_conf=$(get_prop 'hadoop_conf' $PROPFILE)
@@ -224,6 +235,16 @@ password_validation_safenet_keysecure(){
         fi
 }
 
+azure_client_secret_validation(){
+        if [ -z "$1" ]
+        then
+                log "[I] Blank password is not allowed for" $2". Please enter valid password."
+                exit 1
+        else
+                log "[I]" $2 "password validated."
+        fi
+}
+
 init_variables(){
 	curDt=`date '+%Y%m%d%H%M%S'`
 
@@ -571,9 +592,13 @@ update_properties() {
         KEYSECURE_PASSWD="ranger.kms.keysecure.login.password"
         KEYSECURE_PASSWORD_ALIAS="ranger.ks.login.password"
 
+	AZURE_CLIENT_SEC="ranger.kms.azure.client.secret"
+	AZURE_CLIENT_SECRET_ALIAS="ranger.ks.azure.client.secret"
+
 
         HSM_ENABLED=`echo $HSM_ENABLED | tr '[:lower:]' '[:upper:]'`
         KEYSECURE_ENABLED=`echo $KEYSECURE_ENABLED | tr '[:lower:]' '[:upper:]'`
+	AZURE_KEYVAULT_ENABLED=`echo $AZURE_KEYVAULT_ENABLED | tr '[:lower:]' '[:upper:]'`
 
 	if [ "${keystore}" != "" ]
 	then
@@ -613,6 +638,20 @@ update_properties() {
                         updatePropertyToFilePy $propertyName $newPropertyValue $to_file
                 fi
 
+		if [ "${AZURE_KEYVAULT_ENABLED}" == "TRUE" ]
+                then
+                        azure_client_secret_validation "$AZURE_CLIENT_SECRET" "Azure Client Secret"
+                        $PYTHON_COMMAND_INVOKER ranger_credential_helper.py -l "cred/lib/*" -f "$keystore" -k "${AZURE_CLIENT_SECRET_ALIAS}" -v "${AZURE_CLIENT_SECRET}" -c 1
+
+                        propertyName=ranger.kms.azure.client.secret.alias
+                        newPropertyValue="${AZURE_CLIENT_SECRET_ALIAS}"
+                        updatePropertyToFilePy $propertyName $newPropertyValue $to_file
+
+                        propertyName=ranger.kms.azure.client.secret
+                        newPropertyValue="_"
+                        updatePropertyToFilePy $propertyName $newPropertyValue $to_file
+                fi
+
 
 		propertyName=ranger.ks.jpa.jdbc.credential.alias
 		newPropertyValue="${DB_CREDENTIAL_ALIAS}"
@@ -650,6 +689,10 @@ update_properties() {
                 newPropertyValue="${KEYSECURE_PASSWORD}"
                 updatePropertyToFilePy $propertyName $newPropertyValue $to_file
 
+		propertyName="${AZURE_CLIENT_SEC}"
+                newPropertyValue="${AZURE_CLIENT_SECRET}"
+                updatePropertyToFilePy $propertyName $newPropertyValue $to_file
+
 	fi
 
 	if test -f $keystore; then
@@ -753,6 +796,55 @@ update_properties() {
 
         fi
 
+	########### AZURE KEY VAULT #################
+
+
+        if [ "${AZURE_KEYVAULT_ENABLED}" != "TRUE" ]
+        then
+                propertyName=ranger.kms.azurekeyvault.enabled
+                newPropertyValue="false"
+                updatePropertyToFilePy $propertyName $newPropertyValue $to_file
+        else
+                propertyName=ranger.kms.azurekeyvault.enabled
+                newPropertyValue="true"
+                updatePropertyToFilePy $propertyName $newPropertyValue $to_file
+
+		propertyName=ranger.kms.azure.keyvault.ssl.enabled
+                newPropertyValue="${AZURE_KEYVAULT_SSL_ENABLED}"
+                updatePropertyToFilePy $propertyName $newPropertyValue $to_file
+
+                propertyName=ranger.kms.azure.client.id
+                newPropertyValue="${AZURE_CLIENT_ID}"
+                updatePropertyToFilePy $propertyName $newPropertyValue $to_file
+
+		propertyName=ranger.kms.azure.keyvault.certificate.path
+                newPropertyValue="${AZURE_AUTH_KEYVAULT_CERTIFICATE_PATH}"
+                updatePropertyToFilePy $propertyName $newPropertyValue $to_file
+
+		propertyName=ranger.kms.azure.keyvault.certificate.password
+                newPropertyValue="${AZURE_AUTH_KEYVAULT_CERTIFICATE_PASSWORD}"
+                updatePropertyToFilePy $propertyName $newPropertyValue $to_file
+
+
+                propertyName=ranger.kms.azure.masterkey.name
+                newPropertyValue="${AZURE_MASTERKEY_NAME}"
+                updatePropertyToFilePy $propertyName $newPropertyValue $to_file
+
+		propertyName=ranger.kms.azure.masterkey.type
+                newPropertyValue="${AZURE_MASTER_KEY_TYPE}"
+                updatePropertyToFilePy $propertyName $newPropertyValue $to_file
+
+                propertyName=ranger.kms.azure.zonekey.encryption.algorithm
+                newPropertyValue="${ZONE_KEY_ENCRYPTION_ALGO}"
+                updatePropertyToFilePy $propertyName $newPropertyValue $to_file
+
+                propertyName=ranger.kms.azurekeyvault.url
+                newPropertyValue="${AZURE_KEYVAULT_URL}"
+                updatePropertyToFilePy $propertyName $newPropertyValue $to_file
+
+        fi
+
+
 	to_file_kms_site=$PWD/ews/webapp/WEB-INF/classes/conf/ranger-kms-site.xml
     if test -f $to_file_kms_site; then
 		log "[I] $to_file_kms_site file found"
diff --git a/kms/src/main/java/org/apache/hadoop/crypto/key/AzureKeyVaultClientAuthenticator.java b/kms/src/main/java/org/apache/hadoop/crypto/key/AzureKeyVaultClientAuthenticator.java
new file mode 100644
index 0000000..1933589
--- /dev/null
+++ b/kms/src/main/java/org/apache/hadoop/crypto/key/AzureKeyVaultClientAuthenticator.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.crypto.key;
+
+import com.microsoft.aad.adal4j.AsymmetricKeyCredential;
+import com.microsoft.aad.adal4j.AuthenticationContext;
+import com.microsoft.aad.adal4j.AuthenticationResult;
+import com.microsoft.aad.adal4j.ClientCredential;
+import com.microsoft.azure.keyvault.KeyVaultClient;
+import com.microsoft.azure.keyvault.authentication.KeyVaultCredentials;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.NoSuchProviderException;
+import java.security.PrivateKey;
+import java.security.Security;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.Enumeration;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.log4j.Logger;
+import org.bouncycastle.asn1.pkcs.PrivateKeyInfo;
+import org.bouncycastle.cert.X509CertificateHolder;
+import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+import org.bouncycastle.openssl.PEMParser;
+import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;
+import org.bouncycastle.openssl.jcajce.JceOpenSSLPKCS8DecryptorProviderBuilder;
+import org.bouncycastle.operator.InputDecryptorProvider;
+import org.bouncycastle.operator.OperatorCreationException;
+import org.bouncycastle.pkcs.PKCS8EncryptedPrivateKeyInfo;
+import org.bouncycastle.pkcs.PKCSException;
+
+public class AzureKeyVaultClientAuthenticator extends KeyVaultCredentials {
+	static final Logger logger = Logger.getLogger(AzureKeyVaultClientAuthenticator.class);
+	
+	private String authClientID;
+    private String authClientSecret;
+    
+    public AzureKeyVaultClientAuthenticator(String clientID, String clientSecret) {
+        this.authClientID = clientID;
+        this.authClientSecret = clientSecret;
+    }
+    
+    public AzureKeyVaultClientAuthenticator(String clientID) {
+        this.authClientID = clientID;
+    }
+    
+    /**
+     * It does the authentication. This method will be called by the super
+     * class.
+     * 
+     * @param request
+     *            The request being sent
+     * @param challenge
+     *            Information about the challenge from the service.
+     */
+    @Override
+    public String doAuthenticate(String authorization, String resource, String scope) {
+        AuthenticationResult token = getAccessTokenFromClientCredentials(
+				        authorization, resource, authClientID, authClientSecret);
+        return token.getAccessToken();
+    }
+    
+    private static AuthenticationResult getAccessTokenFromClientCredentials(
+            String authorization, String resource, String clientId, String clientKey) {
+        AuthenticationContext context = null;
+        AuthenticationResult result = null;
+        ExecutorService service = null;
+        try {
+            service = Executors.newFixedThreadPool(1);
+            context = new AuthenticationContext(authorization, false, service);
+            ClientCredential credentials = new ClientCredential(clientId, clientKey);
+            Future<AuthenticationResult> future = context.acquireToken(
+                    resource, credentials, null);
+            result = future.get();
+		} catch (Exception e) {
+			throw new RuntimeException(
+					" Error while getting Access token for client id: "
+							+ clientId + " and client secret. Error : " + e);
+		} finally {
+            service.shutdown();
+        }
+
+        if (result == null) {
+            throw new RuntimeException("authentication result was null");
+        }
+        return result;
+    }
+    
+    /**
+	 * Do certificate based authentication using pfx file
+	 * 
+	 * @param path
+	 *            to pfx/pem file
+	 * @param pfxPassword
+	 *            the password to the pfx file, this can be empty if thats the value
+	 *            given when it was created
+	 * @param clientId
+	 *            also known as applicationId which is received after app
+	 *            registration
+     * @throws Exception 
+	 */
+	public KeyVaultClient getAuthentication(String path, String certPassword)
+			throws Exception {
+		KeyCert keyCert = null;
+		if(path.endsWith(".pfx")){
+			try {
+				keyCert = readPfx(path, certPassword);
+			} catch (Exception ex) {
+				throw new Exception(
+						"Error while parsing pfx certificate. Error : " + ex);
+			}
+		}else if(path.endsWith(".pem")){
+			try {
+				keyCert = readPem(path, certPassword);
+			} catch (Exception ex) {
+				throw new Exception(
+						"Error while parsing pem certificate. Error : " + ex);
+			}
+		}
+		final KeyCert certificateKey = keyCert;
+		if (certificateKey != null) {
+			PrivateKey privateKey = certificateKey.getKey();
+
+			// Do certificate based authentication
+			KeyVaultClient keyVaultClient = new KeyVaultClient(
+					new KeyVaultCredentials() {
+
+						@Override
+						public String doAuthenticate(String authorization,
+								String resource, String scope) {
+							AuthenticationContext context;
+							try {
+								context = new AuthenticationContext(
+										authorization, false, Executors
+												.newFixedThreadPool(1));
+								AsymmetricKeyCredential asymmetricKeyCredential = AsymmetricKeyCredential
+										.create(authClientID, privateKey,
+												certificateKey.getCertificate());
+								AuthenticationResult result = context
+										.acquireToken(resource,
+												asymmetricKeyCredential, null)
+										.get();
+								return result.getAccessToken();
+							} catch (Exception e) {
+								throw new RuntimeException("Error while getting authenticated access token from azure key vault with certificate : " + e);
+							}
+						}
+					});
+			return keyVaultClient;
+		}
+		return null;
+	}
+	
+	private KeyCert readPem(String path, String password) throws IOException, CertificateException, OperatorCreationException, PKCSException {
+		Security.addProvider(new BouncyCastleProvider());
+		PEMParser pemParser = new PEMParser(new FileReader(new File(path)));
+		PrivateKey privateKey = null;
+		X509Certificate cert = null;
+		Object object = pemParser.readObject();
+		
+		while (object != null) {
+			JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider("BC");
+			if (object instanceof X509CertificateHolder) {
+				cert = new JcaX509CertificateConverter().getCertificate((X509CertificateHolder) object);
+			}
+			if (object instanceof PKCS8EncryptedPrivateKeyInfo) {
+				PKCS8EncryptedPrivateKeyInfo pinfo = (PKCS8EncryptedPrivateKeyInfo) object;
+				InputDecryptorProvider provider = new JceOpenSSLPKCS8DecryptorProviderBuilder().build(password.toCharArray());
+				PrivateKeyInfo info = pinfo.decryptPrivateKeyInfo(provider);
+				privateKey = converter.getPrivateKey(info);
+			} 
+			if (object instanceof PrivateKeyInfo) {
+				privateKey = converter.getPrivateKey((PrivateKeyInfo) object);
+			}
+			object = pemParser.readObject();
+		}
+		KeyCert keycert = new KeyCert();
+		keycert.setCertificate(cert);
+		keycert.setKey(privateKey);
+		pemParser.close();
+		return keycert;
+	}
+	
+	private KeyCert readPfx(String path, String password) throws NoSuchProviderException, KeyStoreException,
+			IOException, NoSuchAlgorithmException, CertificateException, UnrecoverableKeyException {
+		try (FileInputStream stream = new FileInputStream(path)) {
+			KeyCert keyCert = new KeyCert();
+			boolean isAliasWithPrivateKey = false;
+			final KeyStore store = KeyStore.getInstance("pkcs12", "SunJSSE");
+			store.load((InputStream) stream, password.toCharArray());
+
+			// Iterate over all aliases to find the private key
+			Enumeration<String> aliases = store.aliases();
+			String alias = "";
+			while (aliases.hasMoreElements()) {
+				alias = aliases.nextElement();
+				// Break if alias refers to a private key because we want to use that
+				// certificate
+				if (isAliasWithPrivateKey = store.isKeyEntry(alias)) {
+					break;
+				}
+			}
+			if (isAliasWithPrivateKey) {
+				// Retrieves the certificate from the Java keystore
+				X509Certificate certificate = (X509Certificate) store.getCertificate(alias);
+				PrivateKey key = (PrivateKey) store.getKey(alias, password.toCharArray());
+				keyCert.setCertificate(certificate);
+				keyCert.setKey(key);
+			}
+			return keyCert;
+		}
+	}
+}
+
+class KeyCert {
+
+	X509Certificate certificate;
+	PrivateKey key;
+
+	public X509Certificate getCertificate() {
+		return certificate;
+	}
+
+	public void setCertificate(X509Certificate certificate) {
+		this.certificate = certificate;
+	}
+
+	public PrivateKey getKey() {
+		return key;
+	}
+
+	public void setKey(PrivateKey key) {
+		this.key = key;
+	}
+}
\ No newline at end of file
diff --git a/kms/src/main/java/org/apache/hadoop/crypto/key/DBToAzureKeyVault.java b/kms/src/main/java/org/apache/hadoop/crypto/key/DBToAzureKeyVault.java
new file mode 100644
index 0000000..bacc928
--- /dev/null
+++ b/kms/src/main/java/org/apache/hadoop/crypto/key/DBToAzureKeyVault.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto.key;
+
+import com.microsoft.azure.keyvault.KeyVaultClient;
+import java.io.IOException;
+import java.security.Key;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.ranger.entity.XXRangerKeyStore;
+import org.apache.ranger.kms.dao.DaoManager;
+
+public class DBToAzureKeyVault {
+	private static final String AZURE_CLIENT_ID = "ranger.kms.azure.client.id";
+    private static final String AZURE_CLIENT_SECRET = "ranger.kms.azure.client.secret";
+    private static final String AZURE_MASTER_KEY_ALIAS = "ranger.kms.azure.masterkey.name";
+    private static final String AZURE_KEYVAULT_CERTIFICATE_PATH = "ranger.kms.azure.keyvault.certificate.path";
+    private static final String AZURE_KEYVAULT_URL = "ranger.kms.azurekeyvault.url";
+    private static final String ENCRYPTION_KEY = "ranger.db.encrypt.key.password";
+    private static final String AZURE_MASTER_KEY_TYPE= "ranger.kms.azure.masterkey.type";
+    private static final String ZONE_KEY_ENCRYPTION_ALGO = "ranger.kms.azure.zonekey.encryption.algorithm";
+    private RangerKeyStore dbStore;
+    public static void showUsage() {
+        System.err
+                        .println("USAGE: java "
+                                        + DBToAzureKeyVault.class.getName()
+                                        + " <azureMasterKeyName> <azureMasterKeyType> <zoneKeyEncryptionAlgo> <azureKeyVaultUrl> <azureClientId> <isSSLEnabled> <clientSecret / Certificate Path>");
+		}
+	
+	public static void main(String[] args) {
+		if (args.length < 7) {
+			System.err.println("Invalid number of parameters found.");
+			showUsage();
+			System.exit(1);
+		} else {
+			Configuration conf = RangerKeyStoreProvider.getDBKSConf();
+			String azureKeyName = args[0];
+			if (azureKeyName == null || azureKeyName.trim().isEmpty()) {
+				System.err.println("Azure master key name not provided.");
+				showUsage();
+				System.exit(1);
+			}
+			String azureMasterKeyType = args[1];
+			if (azureMasterKeyType == null
+					|| azureMasterKeyType.trim().isEmpty()) {
+				System.err.println("Azure master key type not provided.");
+				showUsage();
+				System.exit(1);
+			}
+			String zoneKeyEncryptionAlgo = args[2];
+			if (zoneKeyEncryptionAlgo == null
+					|| zoneKeyEncryptionAlgo.trim().isEmpty()) {
+				System.err
+						.println("Zone Key Encryption algorithm name not provided.");
+				showUsage();
+				System.exit(1);
+			}
+			String azureKeyVaultUrl = args[3];
+			if (azureKeyVaultUrl == null || azureKeyVaultUrl.trim().isEmpty()) {
+				System.err.println("Azure Key Vault url not provided.");
+				showUsage();
+				System.exit(1);
+			}
+			String azureClientId = args[4];
+			if (azureClientId == null || azureClientId.trim().isEmpty()) {
+				System.err.println("Azure Client Id is not provided.");
+				showUsage();
+				System.exit(1);
+			}
+			String isSSLEnabled = args[5];
+			if (isSSLEnabled == null || isSSLEnabled.trim().isEmpty()) {
+				System.err.println("isSSLEnabled not provided.");
+				showUsage();
+				System.exit(1);
+			}
+			if (!isSSLEnabled.equalsIgnoreCase("true")
+					&& !isSSLEnabled.equalsIgnoreCase("false")) {
+				System.err
+						.println("Please provide the valid value for isSSLEnabled");
+				showUsage();
+				System.exit(1);
+			}
+
+			String passwordOrCertPath = args[6];
+			String certificatePassword = null;
+			if (passwordOrCertPath == null
+					|| passwordOrCertPath.trim().isEmpty()) {
+				System.err
+						.println("Please provide Azure client password of certificate password");
+				showUsage();
+				System.exit(1);
+			}
+
+			boolean result = false;
+			boolean sslEnabled = false;
+			if (isSSLEnabled.equalsIgnoreCase("true")) {
+				sslEnabled = true;
+				if (!passwordOrCertPath.endsWith(".pem")
+						&& !passwordOrCertPath.endsWith(".pfx")) {
+					System.err
+							.println("Please provide valid certificate file path E.G .pem /.pfx");
+					showUsage();
+					System.exit(1);
+				} else {
+					if (args.length > 7 && !StringUtils.isEmpty(args[7])) {
+						certificatePassword = args[7];
+					}
+				}
+			}
+			result = new DBToAzureKeyVault().doExportMKToAzureKeyVault(
+					sslEnabled, azureKeyName, azureMasterKeyType,
+					zoneKeyEncryptionAlgo, azureClientId, azureKeyVaultUrl,
+					passwordOrCertPath, certificatePassword, conf);
+			if (result) {
+				System.out
+						.println("Master Key from Ranger KMS DB has been successfully imported into Azure Key Vault.");
+			} else {
+				System.out
+						.println("Import of Master Key from DB has been unsuccessful.");
+				System.exit(1);
+			}
+			System.exit(0);
+		}
+	}
+	
+	private boolean doExportMKToAzureKeyVault(boolean sslEnabled, String masterKeyName, String masterKeyType, String zoneKeyEncryptionAlgo,
+			String azureClientId, String azureKeyVaultUrl,
+			String passwordOrCertPath, String certificatePassword, Configuration conf) {
+		try {
+			String mKeyPass = conf.get(ENCRYPTION_KEY);
+			if (mKeyPass == null
+					|| mKeyPass.trim().equals("")
+					|| mKeyPass.trim().equals("_")
+					|| mKeyPass.trim().equals("crypted")) {
+				throw new IOException("Master Key Jceks does not exists");
+			}
+			conf.set(AZURE_MASTER_KEY_TYPE, masterKeyType);
+			conf.set(ZONE_KEY_ENCRYPTION_ALGO, zoneKeyEncryptionAlgo);
+			conf.set(AZURE_MASTER_KEY_ALIAS, masterKeyName);
+			conf.set(AZURE_CLIENT_ID, azureClientId);
+			conf.set(AZURE_KEYVAULT_URL, azureKeyVaultUrl);
+			RangerKMSDB rangerkmsDb = new RangerKMSDB(conf);
+			DaoManager daoManager = rangerkmsDb.getDaoManager();
+			KeyVaultClient kvClient = null;
+			if(sslEnabled){
+				conf.set(AZURE_KEYVAULT_CERTIFICATE_PATH, passwordOrCertPath);
+				AzureKeyVaultClientAuthenticator azureKVClientAuthenticator = new AzureKeyVaultClientAuthenticator(
+						azureClientId);
+				kvClient = !StringUtils.isEmpty(certificatePassword) ? azureKVClientAuthenticator
+						.getAuthentication(passwordOrCertPath, certificatePassword)
+						: azureKVClientAuthenticator.getAuthentication(passwordOrCertPath, "");
+				
+			}else{
+				conf.set(AZURE_CLIENT_SECRET, passwordOrCertPath);
+				AzureKeyVaultClientAuthenticator azureKVClientAuthenticator = new AzureKeyVaultClientAuthenticator(
+						azureClientId, passwordOrCertPath);
+				kvClient = new KeyVaultClient(
+						azureKVClientAuthenticator);
+			}
+			if(kvClient == null){
+				System.err.println("Key Vault is null. Please check the azure related configs.");
+				System.exit(1);
+			}
+			RangerKeyVaultKeyGenerator rangerKVKeyGenerator = new RangerKeyVaultKeyGenerator(
+					conf, kvClient);
+			boolean azureMKSuccess = rangerKVKeyGenerator.generateMasterKey(mKeyPass);
+			if (azureMKSuccess) {
+				dbStore = new RangerKeyStore(daoManager, conf, kvClient);
+				// Get Master Key from Ranger DB
+				RangerMasterKey rangerMasterKey = new RangerMasterKey(
+						daoManager);
+				char[] mkey = rangerMasterKey.getMasterKey(mKeyPass)
+						.toCharArray();
+				List<XXRangerKeyStore> rangerKeyStoreList = new ArrayList<XXRangerKeyStore>();
+				dbStore.engineLoad(null, mkey);
+				Enumeration<String> e = dbStore.engineAliases();
+				Key key;
+				String alias = null;
+				while (e.hasMoreElements()) {
+					alias = e.nextElement();
+					key = dbStore.engineGetKey(alias, mkey);
+					XXRangerKeyStore xxRangerKeyStore = dbStore
+							.convertKeysBetweenRangerKMSAndAzureKeyVault(alias,
+									key, rangerKVKeyGenerator);
+					rangerKeyStoreList.add(xxRangerKeyStore);
+				}
+				if (rangerKeyStoreList != null && !rangerKeyStoreList.isEmpty()) {
+					for(XXRangerKeyStore rangerKeyStore : rangerKeyStoreList){
+						dbStore.dbOperationStore(rangerKeyStore);
+					}
+				}
+				return true;
+			}
+			return false;
+		}catch(Throwable t){
+			throw new RuntimeException(
+                    "Unable to import Master key from Ranger DB to Azure Key Vault ",
+                    t);
+		}
+
+	}
+	
+}
diff --git a/kms/src/main/java/org/apache/hadoop/crypto/key/JKS2RangerUtil.java b/kms/src/main/java/org/apache/hadoop/crypto/key/JKS2RangerUtil.java
index 5e394de..75aa939 100644
--- a/kms/src/main/java/org/apache/hadoop/crypto/key/JKS2RangerUtil.java
+++ b/kms/src/main/java/org/apache/hadoop/crypto/key/JKS2RangerUtil.java
@@ -17,139 +17,233 @@
 
 package org.apache.hadoop.crypto.key;
 
+import com.microsoft.azure.keyvault.KeyVaultClient;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.InputStream;
 import java.security.KeyStore;
 import java.security.KeyStoreException;
 import java.util.Arrays;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.ranger.credentialapi.CredentialReader;
 import org.apache.ranger.kms.dao.DaoManager;
 
 public class JKS2RangerUtil {
-	
+
+	private static final String AZURE_KEYVAULT_ENABLED = "ranger.kms.azurekeyvault.enabled";
+	private static final String AZURE_KEYVAULT_SSL_ENABLED = "ranger.kms.azure.keyvault.ssl.enabled";
+	private static final String AZURE_CLIENT_ID = "ranger.kms.azure.client.id";
+	private static final String AZURE_CLIENT_SECRET = "ranger.kms.azure.client.secret";
+	private static final String AZURE_KEYVAULT_CERTIFICATE_PATH = "ranger.kms.azure.keyvault.certificate.path";
+	private static final String AZURE_KEYVAULT_CERTIFICATE_PASSWORD = "ranger.kms.azure.keyvault.certificate.password";
+	private static final String AZURE_CLIENT_SECRET_ALIAS = "ranger.kms.azure.client.secret.alias";
+	private static final String CREDENTIAL_PATH = "ranger.ks.jpa.jdbc.credential.provider.path";
 	private static final String DEFAULT_KEYSTORE_TYPE = "jceks";
 	private static final String ENCRYPTION_KEY = "ranger.db.encrypt.key.password";
-        private static final String KEYSECURE_ENABLED = "ranger.kms.keysecure.enabled";
-        private static final String KEYSECURE_USERNAME = "ranger.kms.keysecure.login.username";
-    private static final String KEYSECURE_PASSWORD = "ranger.kms.keysecure.login.password";
-    private static final String KEYSECURE_PASSWORD_ALIAS = "ranger.kms.keysecure.login.password.alias";
-    private static final String KEYSECURE_LOGIN = "ranger.kms.keysecure.login";
-    private static final String CREDENTIAL_PATH = "ranger.ks.jpa.jdbc.credential.provider.path";
-	
+	private static final String KEYSECURE_ENABLED = "ranger.kms.keysecure.enabled";
+	private static final String KEYSECURE_USERNAME = "ranger.kms.keysecure.login.username";
+	private static final String KEYSECURE_PASSWORD = "ranger.kms.keysecure.login.password";
+	private static final String KEYSECURE_PASSWORD_ALIAS = "ranger.kms.keysecure.login.password.alias";
+	private static final String KEYSECURE_LOGIN = "ranger.kms.keysecure.login";
+
 	public static void showUsage() {
-		System.err.println("USAGE: java " + JKS2RangerUtil.class.getName() + " <KMS_FileName> [KeyStoreType]");
-		System.err.println(" If KeyStoreType is not provided, it will be considered as " + DEFAULT_KEYSTORE_TYPE);
-		System.err.println(" When execution of this utility, it will prompt for both keystore password and key password.");
+		System.err.println("USAGE: java " + JKS2RangerUtil.class.getName()
+				+ " <KMS_FileName> [KeyStoreType]");
+		System.err
+				.println(" If KeyStoreType is not provided, it will be considered as "
+						+ DEFAULT_KEYSTORE_TYPE);
+		System.err
+				.println(" When execution of this utility, it will prompt for both keystore password and key password.");
 	}
-	
 
 	public static void main(String[] args) {
-			if (args.length == 0) {
-				System.err.println("Invalid number of parameters found.");
+		if (args.length == 0) {
+			System.err.println("Invalid number of parameters found.");
+			showUsage();
+			System.exit(1);
+		} else {
+			String keyStoreFileName = args[0];
+			File f = new File(keyStoreFileName);
+			if (!f.exists()) {
+				System.err.println("File: [" + f.getAbsolutePath()
+						+ "] does not exists.");
 				showUsage();
 				System.exit(1);
 			}
-			else {
-				String keyStoreFileName = args[0];
-				File f = new File(keyStoreFileName);
-				if (! f.exists()) {
-					System.err.println("File: [" + f.getAbsolutePath() + "] does not exists.");
-					showUsage();
-					System.exit(1);
-				}
-				String keyStoreType = (args.length == 2 ? args[1] : DEFAULT_KEYSTORE_TYPE);
-				try {
-					KeyStore.getInstance(keyStoreType);
-				} catch (KeyStoreException e) {
-					System.err.println("ERROR: Unable to get valid keystore for the type [" + keyStoreType + "]");
-					showUsage();
-					System.exit(1);
+			String keyStoreType = (args.length == 2 ? args[1]
+					: DEFAULT_KEYSTORE_TYPE);
+			try {
+				KeyStore.getInstance(keyStoreType);
+			} catch (KeyStoreException e) {
+				System.err
+						.println("ERROR: Unable to get valid keystore for the type ["
+								+ keyStoreType + "]");
+				showUsage();
+				System.exit(1);
+			}
+
+			new JKS2RangerUtil().doImportKeysFromJKS(keyStoreFileName,
+					keyStoreType);
+
+			System.out.println("Keys from " + keyStoreFileName
+					+ " has been successfully imported into RangerDB.");
+
+			System.exit(0);
+
+		}
+	}
+
+	private static void getFromJceks(Configuration conf, String path,
+			String alias, String key) {
+
+		// update credential from keystore
+		if (conf != null) {
+			String pathValue = conf.get(path);
+			String aliasValue = conf.get(alias);
+			if (pathValue != null && aliasValue != null) {
+				String xaDBPassword = CredentialReader.getDecryptedString(
+						pathValue.trim(), aliasValue.trim());
+				if (xaDBPassword != null && !xaDBPassword.trim().isEmpty()
+						&& !xaDBPassword.trim().equalsIgnoreCase("none")) {
+					conf.set(key, xaDBPassword);
 				}
-				
-				new JKS2RangerUtil().doImportKeysFromJKS(keyStoreFileName, keyStoreType);
-				
-				System.out.println("Keys from " + keyStoreFileName + " has been successfully imported into RangerDB.");
-				
-				System.exit(0);
-				
 			}
+		}
 	}
-	
-        private static void getFromJceks(Configuration conf, String path, String alias, String key) {
-
-        //update credential from keystore
-        if (conf != null) {
-            String pathValue = conf.get(path);
-            String aliasValue = conf.get(alias);
-            if (pathValue != null && aliasValue != null) {
-                String xaDBPassword = CredentialReader.getDecryptedString(pathValue.trim(), aliasValue.trim());
-                if (xaDBPassword != null && !xaDBPassword.trim().isEmpty() &&
-                        !xaDBPassword.trim().equalsIgnoreCase("none")) {
-                    conf.set(key, xaDBPassword);
-                }
-            }
-        }
-    }
-
-	private void doImportKeysFromJKS(String keyStoreFileName, String keyStoreType) {
+
+	private void doImportKeysFromJKS(String keyStoreFileName,
+			String keyStoreType) {
 		char[] keyStorePassword = null;
 		char[] keyPassword = null;
 		try {
-			keyStorePassword = ConsoleUtil.getPasswordFromConsole("Enter Password for the keystore FILE :");
-			keyPassword = ConsoleUtil.getPasswordFromConsole("Enter Password for the KEY(s) stored in the keystore:");
+			keyStorePassword = ConsoleUtil
+					.getPasswordFromConsole("Enter Password for the keystore FILE :");
+			keyPassword = ConsoleUtil
+					.getPasswordFromConsole("Enter Password for the KEY(s) stored in the keystore:");
 			Configuration conf = RangerKeyStoreProvider.getDBKSConf();
-			RangerKMSDB rangerkmsDb = new RangerKMSDB(conf);		
+			RangerKMSDB rangerkmsDb = new RangerKMSDB(conf);
 			DaoManager daoManager = rangerkmsDb.getDaoManager();
 			RangerKeyStore dbStore = new RangerKeyStore(daoManager);
-                        char[] masterKey;
+			char[] masterKey = null;
 			String password = conf.get(ENCRYPTION_KEY);
 			InputStream in = null;
 
+			if (conf != null
+					&& StringUtils.isNotEmpty(conf.get(KEYSECURE_ENABLED))
+					&& conf.get(KEYSECURE_ENABLED).equalsIgnoreCase("true")) {
 
-                        if (conf != null
-                                        && StringUtils.isNotEmpty(conf.get(KEYSECURE_ENABLED))
-                                        && conf.get(KEYSECURE_ENABLED).equalsIgnoreCase("true")) {
-
-                                getFromJceks(conf, CREDENTIAL_PATH, KEYSECURE_PASSWORD_ALIAS, KEYSECURE_PASSWORD);
-                                String keySecureLoginCred = conf.get(KEYSECURE_USERNAME).trim() + ":" + conf.get(KEYSECURE_PASSWORD);
-                                conf.set(KEYSECURE_LOGIN, keySecureLoginCred);
-
-                                RangerSafenetKeySecure rangerSafenetKeySecure = new RangerSafenetKeySecure(
-                                                conf);
-                                rangerSafenetKeySecure.generateMasterKey(password);
-                                masterKey = rangerSafenetKeySecure.getMasterKey(password).toCharArray();
-                        } else {
-                                RangerMasterKey rangerMasterKey = new RangerMasterKey(
-                                                daoManager);
-                                rangerMasterKey.generateMasterKey(password);
-                                masterKey = rangerMasterKey.getMasterKey(password)
-                                                .toCharArray();
-                        }
-
-
+				getFromJceks(conf, CREDENTIAL_PATH, KEYSECURE_PASSWORD_ALIAS,
+						KEYSECURE_PASSWORD);
+				String keySecureLoginCred = conf.get(KEYSECURE_USERNAME).trim()
+						+ ":" + conf.get(KEYSECURE_PASSWORD);
+				conf.set(KEYSECURE_LOGIN, keySecureLoginCred);
 
+				RangerSafenetKeySecure rangerSafenetKeySecure = new RangerSafenetKeySecure(
+						conf);
+				rangerSafenetKeySecure.generateMasterKey(password);
+				masterKey = rangerSafenetKeySecure.getMasterKey(password)
+						.toCharArray();
+			} else if (conf != null
+					&& StringUtils.isNotEmpty(conf.get(AZURE_KEYVAULT_ENABLED))
+					&& conf.get(AZURE_KEYVAULT_ENABLED)
+							.equalsIgnoreCase("true")) {
+				getFromJceks(conf, CREDENTIAL_PATH, AZURE_CLIENT_SECRET_ALIAS,
+						AZURE_CLIENT_SECRET);
+				String azureClientId = conf.get(AZURE_CLIENT_ID);
+				if (StringUtils.isEmpty(azureClientId)) {
+					throw new Exception(
+							"Azure Key Vault is enabled and client id is not configured");
+				}
+				String azureClientSecret = conf.get(AZURE_CLIENT_SECRET);
+				AzureKeyVaultClientAuthenticator azureKVClientAuthenticator;
+				RangerKeyVaultKeyGenerator rangerKVKeyGenerator = null;
+				KeyVaultClient kvClient = null;
+				if (conf != null
+						&& StringUtils.isNotEmpty(conf.get(AZURE_KEYVAULT_SSL_ENABLED))
+						&& conf.get(AZURE_KEYVAULT_SSL_ENABLED).equalsIgnoreCase("false")) {
+					try {
+						azureKVClientAuthenticator = new AzureKeyVaultClientAuthenticator(
+								azureClientId, azureClientSecret);
+						kvClient = new KeyVaultClient(
+								azureKVClientAuthenticator);
+					} catch (Exception ex) {
+						throw new Exception(
+								"Error while getting key vault client object with client id and client secret : "
+										+ ex);
+					}
+				} else {
+					try {
+						azureKVClientAuthenticator = new AzureKeyVaultClientAuthenticator(
+								azureClientId);
+						String keyVaultCertPath = conf
+								.get(AZURE_KEYVAULT_CERTIFICATE_PATH);
+						if (StringUtils.isEmpty(keyVaultCertPath)) {
+							throw new Exception(
+									"Azure Key Vault is enabled. Please provide client secret or certificate path for authentication.");
+						}
+						String keyVaultCertPassword = conf
+								.get(AZURE_KEYVAULT_CERTIFICATE_PASSWORD);
+						kvClient = !StringUtils.isEmpty(keyVaultCertPassword) ? azureKVClientAuthenticator
+								.getAuthentication(keyVaultCertPath,
+										keyVaultCertPassword)
+								: azureKVClientAuthenticator.getAuthentication(
+										keyVaultCertPath, "");
+					} catch (Exception ex) {
+						throw new Exception(
+								"Error while getting key vault client object with client id and certificate. Error :  : "
+										+ ex);
+					}
+				}
+				boolean success = false;
+				if (kvClient != null) {
+					try {
+						dbStore = new RangerKeyStore(daoManager, conf, kvClient);
+						rangerKVKeyGenerator = new RangerKeyVaultKeyGenerator(
+								conf, kvClient);
+						if (rangerKVKeyGenerator != null) {
+							success = rangerKVKeyGenerator
+									.generateMasterKey(password);
+						}
+					} catch (Exception ex) {
+						throw new Exception(
+								"Error while generating master key and master key secret in Azure key vault. Error :  : "
+										+ ex);
+					}
+				}
+				if (success) {
+					/* Master key not exportable from key vault */
+					masterKey = null;
+				}
+			} else {
+				RangerMasterKey rangerMasterKey = new RangerMasterKey(
+						daoManager);
+				rangerMasterKey.generateMasterKey(password);
+				masterKey = rangerMasterKey.getMasterKey(password)
+						.toCharArray();
+			}
 			try {
 				in = new FileInputStream(new File(keyStoreFileName));
-				dbStore.engineLoadKeyStoreFile(in, keyStorePassword, keyPassword, masterKey, keyStoreType);
-				dbStore.engineStore(null,masterKey);	
-			}
-			finally {
+				dbStore.engineLoadKeyStoreFile(in, keyStorePassword,
+						keyPassword, masterKey, keyStoreType);
+				dbStore.engineStore(null, masterKey);
+			} finally {
 				if (in != null) {
 					try {
 						in.close();
 					} catch (Exception e) {
-						throw new RuntimeException("ERROR:  Unable to close file stream for [" + keyStoreFileName + "]", e);
+						throw new RuntimeException(
+								"ERROR:  Unable to close file stream for ["
+										+ keyStoreFileName + "]", e);
 					}
 				}
 			}
-		}
-		catch(Throwable t) {
-			throw new RuntimeException("Unable to import keys from [" + keyStoreFileName + "] due to exception.", t);
-		}
-		finally{
+		} catch (Throwable t) {
+			throw new RuntimeException("Unable to import keys from ["
+					+ keyStoreFileName + "] due to exception.", t);
+		} finally {
 			Arrays.fill(keyStorePassword, ' ');
 			Arrays.fill(keyPassword, ' ');
 		}
diff --git a/kms/src/main/java/org/apache/hadoop/crypto/key/Ranger2JKSUtil.java b/kms/src/main/java/org/apache/hadoop/crypto/key/Ranger2JKSUtil.java
index f542364..6e4f75a 100644
--- a/kms/src/main/java/org/apache/hadoop/crypto/key/Ranger2JKSUtil.java
+++ b/kms/src/main/java/org/apache/hadoop/crypto/key/Ranger2JKSUtil.java
@@ -16,6 +16,8 @@
  */
 package org.apache.hadoop.crypto.key;
 
+import com.microsoft.azure.keyvault.KeyVaultClient;
+
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -23,6 +25,7 @@ import java.io.OutputStream;
 import java.security.KeyStore;
 import java.security.KeyStoreException;
 import java.util.Arrays;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.ranger.credentialapi.CredentialReader;
@@ -30,125 +33,201 @@ import org.apache.ranger.kms.dao.DaoManager;
 
 public class Ranger2JKSUtil {
 
+	private static final String AZURE_KEYVAULT_ENABLED = "ranger.kms.azurekeyvault.enabled";
+	private static final String AZURE_KEYVAULT_SSL_ENABLED = "ranger.kms.azure.keyvault.ssl.enabled";
+	private static final String AZURE_CLIENT_ID = "ranger.kms.azure.client.id";
+	private static final String AZURE_CLIENT_SECRET = "ranger.kms.azure.client.secret";
+	private static final String AZURE_CLIENT_SECRET_ALIAS = "ranger.kms.azure.client.secret.alias";
+	private static final String AZURE_KEYVAULT_CERTIFICATE_PATH = "ranger.kms.azure.keyvault.certificate.path";
+	private static final String AZURE_KEYVAULT_CERTIFICATE_PASSWORD = "ranger.kms.azure.keyvault.certificate.password";
+	private static final String CREDENTIAL_PATH = "ranger.ks.jpa.jdbc.credential.provider.path";
 	private static final String DEFAULT_KEYSTORE_TYPE = "jceks";
 	private static final String ENCRYPTION_KEY = "ranger.db.encrypt.key.password";
-        private static final String KEYSECURE_ENABLED = "ranger.kms.keysecure.enabled";
-        private static final String KEYSECURE_USERNAME = "ranger.kms.keysecure.login.username";
-    private static final String KEYSECURE_PASSWORD = "ranger.kms.keysecure.login.password";
-    private static final String KEYSECURE_PASSWORD_ALIAS = "ranger.kms.keysecure.login.password.alias";
-    private static final String KEYSECURE_LOGIN = "ranger.kms.keysecure.login";
-    private static final String CREDENTIAL_PATH = "ranger.ks.jpa.jdbc.credential.provider.path";
-	
+	private static final String KEYSECURE_ENABLED = "ranger.kms.keysecure.enabled";
+	private static final String KEYSECURE_USERNAME = "ranger.kms.keysecure.login.username";
+	private static final String KEYSECURE_PASSWORD = "ranger.kms.keysecure.login.password";
+	private static final String KEYSECURE_PASSWORD_ALIAS = "ranger.kms.keysecure.login.password.alias";
+	private static final String KEYSECURE_LOGIN = "ranger.kms.keysecure.login";
+
 	public static void showUsage() {
-		System.err.println("USAGE: java " + Ranger2JKSUtil.class.getName() + " <KMS_FileName> [KeyStoreType]");
-		System.err.println(" If KeyStoreType is not provided, it will be considered as " + DEFAULT_KEYSTORE_TYPE);
-		System.err.println(" When execution of this utility, it will prompt for both keystore password and key password.");
+		System.err.println("USAGE: java " + Ranger2JKSUtil.class.getName()
+				+ " <KMS_FileName> [KeyStoreType]");
+		System.err
+				.println(" If KeyStoreType is not provided, it will be considered as "
+						+ DEFAULT_KEYSTORE_TYPE);
+		System.err
+				.println(" When execution of this utility, it will prompt for both keystore password and key password.");
 	}
-	
 
 	public static void main(String[] args) throws IOException {
-			if (args.length == 0) {
-				System.err.println("Invalid number of parameters found.");
+		if (args.length == 0) {
+			System.err.println("Invalid number of parameters found.");
+			showUsage();
+			System.exit(1);
+		} else {
+			String keyStoreFileName = args[0];
+			File f = new File(keyStoreFileName);
+			if (!f.exists()) {
+				boolean ret = f.createNewFile();
+				if (!ret) {
+					System.err
+							.println("Error creating new keystore file. fileName="
+									+ args[0]);
+				}
+			}
+			String keyStoreType = (args.length == 2 ? args[1]
+					: DEFAULT_KEYSTORE_TYPE);
+			try {
+				KeyStore.getInstance(keyStoreType);
+			} catch (KeyStoreException e) {
+				System.err
+						.println("ERROR: Unable to get valid keystore for the type ["
+								+ keyStoreType + "]");
 				showUsage();
 				System.exit(1);
 			}
-			else {
-				String keyStoreFileName = args[0];
-				File f = new File(keyStoreFileName);
-				if (! f.exists()) {					
-					boolean ret = f.createNewFile();
-					if (!ret) {
-						System.err.println("Error creating new keystore file. fileName="+ args[0]);
-					}
-				}
-				String keyStoreType = (args.length == 2 ? args[1] : DEFAULT_KEYSTORE_TYPE);
-				try {
-					KeyStore.getInstance(keyStoreType);
-				} catch (KeyStoreException e) {
-					System.err.println("ERROR: Unable to get valid keystore for the type [" + keyStoreType + "]");
-					showUsage();
-					System.exit(1);
-				}
-				
-				new Ranger2JKSUtil().doExportKeysFromJKS(keyStoreFileName, keyStoreType);
-				
-				System.out.println("Keys from Ranger KMS Database has been successfully exported into " + keyStoreFileName);
-				
-				System.exit(0);
-				
-			}
+			new Ranger2JKSUtil().doExportKeysFromJKS(keyStoreFileName,
+					keyStoreType);
+			System.out
+					.println("Keys from Ranger KMS Database has been successfully exported into "
+							+ keyStoreFileName);
+			System.exit(0);
+		}
 	}
-	
-	private void doExportKeysFromJKS(String keyStoreFileName, String keyStoreType) {
+
+	private void doExportKeysFromJKS(String keyStoreFileName,
+			String keyStoreType) {
 		char[] keyStorePassword = null;
 		char[] keyPassword = null;
 		try {
-			keyStorePassword = ConsoleUtil.getPasswordFromConsole("Enter Password for the keystore FILE :");
-			keyPassword = ConsoleUtil.getPasswordFromConsole("Enter Password for the KEY(s) stored in the keystore:");
+			keyStorePassword = ConsoleUtil
+					.getPasswordFromConsole("Enter Password for the keystore FILE :");
+			keyPassword = ConsoleUtil
+					.getPasswordFromConsole("Enter Password for the KEY(s) stored in the keystore:");
 			Configuration conf = RangerKeyStoreProvider.getDBKSConf();
-			RangerKMSDB rangerkmsDb = new RangerKMSDB(conf);		
+			RangerKMSDB rangerkmsDb = new RangerKMSDB(conf);
 			DaoManager daoManager = rangerkmsDb.getDaoManager();
-			RangerKeyStore dbStore = new RangerKeyStore(daoManager);
-                        char[] masterKey;
+			RangerKeyStore dbStore;
+			char[] masterKey = null;
 			String password = conf.get(ENCRYPTION_KEY);
-                        if (conf != null
-                                        && StringUtils.isNotEmpty(conf.get(KEYSECURE_ENABLED))
-                                        && conf.get(KEYSECURE_ENABLED).equalsIgnoreCase("true")) {
+			if (conf != null
+					&& StringUtils.isNotEmpty(conf.get(KEYSECURE_ENABLED))
+					&& conf.get(KEYSECURE_ENABLED).equalsIgnoreCase("true")) {
 
-                                getFromJceks(conf, CREDENTIAL_PATH, KEYSECURE_PASSWORD_ALIAS, KEYSECURE_PASSWORD);
-                                String keySecureLoginCred = conf.get(KEYSECURE_USERNAME).trim() + ":" + conf.get(KEYSECURE_PASSWORD);
-                                conf.set(KEYSECURE_LOGIN, keySecureLoginCred);
+				getFromJceks(conf, CREDENTIAL_PATH, KEYSECURE_PASSWORD_ALIAS,
+						KEYSECURE_PASSWORD);
+				String keySecureLoginCred = conf.get(KEYSECURE_USERNAME).trim()
+						+ ":" + conf.get(KEYSECURE_PASSWORD);
+				conf.set(KEYSECURE_LOGIN, keySecureLoginCred);
 
-                                RangerSafenetKeySecure rangerSafenetKeySecure = new RangerSafenetKeySecure(
-                                                conf);
-                                masterKey = rangerSafenetKeySecure.getMasterKey(password)
-                                                .toCharArray();
+				RangerSafenetKeySecure rangerSafenetKeySecure = new RangerSafenetKeySecure(
+						conf);
+				masterKey = rangerSafenetKeySecure.getMasterKey(password)
+						.toCharArray();
+				dbStore = new RangerKeyStore(daoManager);
+
+			} else if (conf != null
+					&& StringUtils.isNotEmpty(conf.get(AZURE_KEYVAULT_ENABLED))
+					&& conf.get(AZURE_KEYVAULT_ENABLED)
+							.equalsIgnoreCase("true")) {
+				getFromJceks(conf, CREDENTIAL_PATH, AZURE_CLIENT_SECRET_ALIAS,
+						AZURE_CLIENT_SECRET);
+				String azureClientId = conf.get(AZURE_CLIENT_ID);
+				if (StringUtils.isEmpty(azureClientId)) {
+					throw new Exception(
+							"Azure Key Vault is enabled and client id is not configured");
+				}
+				String azureClientSecret = conf.get(AZURE_CLIENT_SECRET);
+				dbStore = new RangerKeyStore(daoManager);
+				AzureKeyVaultClientAuthenticator azureKVClientAuthenticator;
+				KeyVaultClient kvClient = null;
+				if (conf != null
+						&& StringUtils.isNotEmpty(conf.get(AZURE_KEYVAULT_SSL_ENABLED))
+						&& conf.get(AZURE_KEYVAULT_SSL_ENABLED).equalsIgnoreCase("false")) {
+					try {
+						azureKVClientAuthenticator = new AzureKeyVaultClientAuthenticator(
+								azureClientId, azureClientSecret);
+						kvClient = new KeyVaultClient(
+								azureKVClientAuthenticator);
+					} catch (Exception ex) {
+						throw new Exception(
+								"Error while getting key vault client object with client id and client secret : "
+										+ ex);
+					}
+				} else {
+					try {
+						azureKVClientAuthenticator = new AzureKeyVaultClientAuthenticator(
+								azureClientId);
+						String keyVaultCertPath = conf
+								.get(AZURE_KEYVAULT_CERTIFICATE_PATH);
+						if (StringUtils.isEmpty(keyVaultCertPath)) {
+							throw new Exception(
+									"Azure Key Vault is enabled. Please provide client secret or certificate path for authentication.");
+						}
+						String keyVaultCertPassword = conf
+								.get(AZURE_KEYVAULT_CERTIFICATE_PASSWORD);
 
-                        } else {
-                                RangerMasterKey rangerMasterKey = new RangerMasterKey(
-                                                daoManager);
-                                masterKey = rangerMasterKey.getMasterKey(password)
-                                                .toCharArray();
-                        }
+						kvClient = !StringUtils.isEmpty(keyVaultCertPassword) ? azureKVClientAuthenticator
+								.getAuthentication(keyVaultCertPath,
+										keyVaultCertPassword)
+								: azureKVClientAuthenticator.getAuthentication(
+										keyVaultCertPath, "");
+					} catch (Exception ex) {
+						throw new Exception(
+								"Error while getting key vault client object with client id and certificate. Error :  : "
+										+ ex);
+					}
+				}
+				if (kvClient != null) {
+					masterKey = null;
+					dbStore = new RangerKeyStore(daoManager, conf, kvClient);
+				}
+			} else {
+				RangerMasterKey rangerMasterKey = new RangerMasterKey(
+						daoManager);
+				masterKey = rangerMasterKey.getMasterKey(password)
+						.toCharArray();
+				dbStore = new RangerKeyStore(daoManager);
+			}
 			OutputStream out = null;
 			try {
 				out = new FileOutputStream(new File(keyStoreFileName));
-				dbStore.engineLoadToKeyStoreFile(out, keyStorePassword, keyPassword, masterKey, keyStoreType);
-			}
-			finally {
+				dbStore.engineLoadToKeyStoreFile(out, keyStorePassword,
+						keyPassword, masterKey, keyStoreType);
+			} finally {
 				if (out != null) {
 					try {
 						out.close();
 					} catch (Exception e) {
-						throw new RuntimeException("ERROR:  Unable to close file stream for [" + keyStoreFileName + "]", e);
+						throw new RuntimeException(
+								"ERROR:  Unable to close file stream for ["
+										+ keyStoreFileName + "]", e);
 					}
 				}
 			}
-		}
-		catch(Throwable t) {
-			throw new RuntimeException("Unable to export keys to [" + keyStoreFileName + "] due to exception.", t);
-		}
-		finally{
+		} catch (Throwable t) {
+			throw new RuntimeException("Unable to export keys to ["
+					+ keyStoreFileName + "] due to exception.", t);
+		} finally {
 			Arrays.fill(keyStorePassword, ' ');
 			Arrays.fill(keyPassword, ' ');
 		}
 	}
-	
-        private static void getFromJceks(Configuration conf, String path, String alias, String key) {
-
-        //update credential from keystore
-        if (conf != null) {
-            String pathValue = conf.get(path);
-            String aliasValue = conf.get(alias);
-            if (pathValue != null && aliasValue != null) {
-                String xaDBPassword = CredentialReader.getDecryptedString(pathValue.trim(), aliasValue.trim());
-                if (xaDBPassword != null && !xaDBPassword.trim().isEmpty() &&
-                        !xaDBPassword.trim().equalsIgnoreCase("none")) {
-                    conf.set(key, xaDBPassword);
-                }
-            }
-        }
-    }
-
 
+	private static void getFromJceks(Configuration conf, String path,
+			String alias, String key) {
+		// update credential from keystore
+		if (conf != null) {
+			String pathValue = conf.get(path);
+			String aliasValue = conf.get(alias);
+			if (pathValue != null && aliasValue != null) {
+				String xaDBPassword = CredentialReader.getDecryptedString(
+						pathValue.trim(), aliasValue.trim());
+				if (xaDBPassword != null && !xaDBPassword.trim().isEmpty()
+						&& !xaDBPassword.trim().equalsIgnoreCase("none")) {
+					conf.set(key, xaDBPassword);
+				}
+			}
+		}
+	}
 }
diff --git a/kms/src/main/java/org/apache/hadoop/crypto/key/RangerKeyStore.java b/kms/src/main/java/org/apache/hadoop/crypto/key/RangerKeyStore.java
index 86f1a29..f3d7c20 100644
--- a/kms/src/main/java/org/apache/hadoop/crypto/key/RangerKeyStore.java
+++ b/kms/src/main/java/org/apache/hadoop/crypto/key/RangerKeyStore.java
@@ -17,6 +17,8 @@
 
 package org.apache.hadoop.crypto.key;
 
+import com.microsoft.azure.keyvault.KeyVaultClient;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
@@ -54,21 +56,28 @@ import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-
 import javax.crypto.Cipher;
 import javax.crypto.IllegalBlockSizeException;
+import javax.crypto.KeyGenerator;
 import javax.crypto.SealedObject;
 import javax.crypto.SecretKey;
 import javax.crypto.SecretKeyFactory;
 import javax.crypto.spec.PBEKeySpec;
 import javax.crypto.spec.PBEParameterSpec;
+import javax.crypto.spec.SecretKeySpec;
 import javax.xml.bind.DatatypeConverter;
-
+import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.crypto.key.KeyProvider.Metadata;
+import org.apache.hadoop.crypto.key.RangerKeyStoreProvider.KeyMetadata;
 import org.apache.log4j.Logger;
 import org.apache.ranger.entity.XXRangerKeyStore;
 import org.apache.ranger.kms.dao.DaoManager;
 import org.apache.ranger.kms.dao.RangerKMSDao;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
 
 /**
  * This class provides the Database store implementation.
@@ -77,28 +86,40 @@ import org.apache.ranger.kms.dao.RangerKMSDao;
 public class RangerKeyStore extends KeyStoreSpi {
 
     static final Logger logger = Logger.getLogger(RangerKeyStore.class);
+    private static final String KEY_METADATA = "KeyMetadata";
     private static final String KEY_NAME_VALIDATION = "[a-z,A-Z,0-9](?!.*--)(?!.*__)(?!.*-_)(?!.*_-)[\\w\\-\\_]*";
     private static final Pattern pattern = Pattern.compile(KEY_NAME_VALIDATION);
+    private static final String AZURE_KEYVAULT_ENABLED = "ranger.kms.azurekeyvault.enabled";
+    private boolean azureKeyVaultEnabled = false;
 
     private DaoManager daoManager;
+    private RangerKeyVaultKeyGenerator kvKeyGen;
 
     // keys
     private static class KeyEntry {
         Date date = new Date(); // the creation date of this entry
     }
 
-    ;
-
     // Secret key
-    private static final class SecretKeyEntry {
-        Date date = new Date(); // the creation date of this entry
-        SealedObject sealedKey;
-        String cipher_field;
-        int bit_length;
-        String description;
-        String attributes;
-        int version;
-    }
+	private static final class SecretKeyEntry {
+		Date date = new Date(); // the creation date of this entry
+		SealedObject sealedKey;
+		String cipher_field;
+		int bit_length;
+		String description;
+		String attributes;
+		int version;
+	}
+
+	private static final class SecretKeyByteEntry {
+		Date date = new Date();
+		byte[] key;
+		String cipher_field;
+		int bit_length;
+		String description;
+		String attributes;
+		int version;
+	}
 
     private Map<String, Object> keyEntries = new ConcurrentHashMap<>();
     private Map<String, Object> deltaEntries = new ConcurrentHashMap<>();
@@ -109,6 +130,18 @@ public class RangerKeyStore extends KeyStoreSpi {
     public RangerKeyStore(DaoManager daoManager) {
         this.daoManager = daoManager;
     }
+    
+    public RangerKeyStore(DaoManager daoManager, Configuration conf, KeyVaultClient kvClient) {
+        this.daoManager = daoManager;
+        this.kvKeyGen = new RangerKeyVaultKeyGenerator(conf, kvClient);
+        if(conf != null
+				&& StringUtils.isNotEmpty(conf
+						.get(AZURE_KEYVAULT_ENABLED))
+				&& conf.get(AZURE_KEYVAULT_ENABLED).equalsIgnoreCase(
+						"true")){
+        	azureKeyVaultEnabled = true;
+        }
+    }
 
     String convertAlias(String alias) {
         return alias.toLowerCase();
@@ -120,13 +153,11 @@ public class RangerKeyStore extends KeyStoreSpi {
             logger.debug("==> RangerKeyStore.engineGetKey()");
         }
         Key key = null;
-
         Object entry = keyEntries.get(convertAlias(alias));
 
         if (!(entry instanceof SecretKeyEntry)) {
             return null;
         }
-
         try {
             key = unsealKey(((SecretKeyEntry) entry).sealedKey, password);
         } catch (Exception e) {
@@ -137,7 +168,85 @@ public class RangerKeyStore extends KeyStoreSpi {
         }
         return key;
     }
-
+    
+	public byte[] engineGetDecryptedZoneKeyByte(String alias) throws Exception {
+		try {
+			Object entry = keyEntries.get(convertAlias(alias));
+			if (!(entry instanceof SecretKeyByteEntry)) {
+				return null;
+			}
+			SecretKeyByteEntry key = (SecretKeyByteEntry) entry;
+			byte[] decryptKeyByte = kvKeyGen.dencryptZoneKey(key.key);
+			return decryptKeyByte;
+		} catch (Exception ex) {
+			throw new Exception("Error while decrpting zone key. Name : "
+					+ alias + " Error : " + ex);
+		}
+	}
+	
+	public Key engineGetDecryptedZoneKey(String alias) throws Exception {
+		byte[] decryptKeyByte = engineGetDecryptedZoneKeyByte(alias);
+		Metadata metadata = engineGetKeyMetadata(alias); 
+		Key k = new KeyByteMetadata(metadata, decryptKeyByte);
+		return k;
+	}
+	
+	public Metadata engineGetKeyMetadata(String alias) {
+		Object entry = keyEntries.get(convertAlias(alias));
+		if (!(entry instanceof SecretKeyByteEntry)) {
+			return null;
+		}
+		SecretKeyByteEntry key = (SecretKeyByteEntry) entry;
+		ObjectMapper mapper = new ObjectMapper();
+		Map<String, String> attributesMap = null;
+		try {
+			attributesMap = mapper.readValue(key.attributes,
+					new TypeReference<Map<String, String>>() {
+					});
+		} catch (JsonParseException e) {
+			logger.error("Invalid attribute string data: " + e.getMessage());
+
+		} catch (JsonMappingException e) {
+			logger.error("Invalid attribute string data: " + e.getMessage());
+		} catch (IOException e) {
+			logger.error("Invalid attribute string data: " + e.getMessage());
+		}
+		Metadata meta = new Metadata(key.cipher_field, key.bit_length,
+				key.description, attributesMap, key.date, key.version);
+		return meta;
+	}
+	
+	public void addSecureKeyByteEntry(String alias, Key key, String cipher,
+			int bitLength, String description, int version, String attributes)
+			throws KeyStoreException {
+		SecretKeyByteEntry entry = new SecretKeyByteEntry();
+		synchronized (deltaEntries) {
+			try {
+				entry.date = new Date();
+				// encrypt and store the key
+				entry.key = kvKeyGen.encryptZoneKey(key);
+				entry.cipher_field = cipher;
+				entry.bit_length = bitLength;
+				entry.description = description;
+				entry.version = version;
+				entry.attributes = attributes;
+				deltaEntries.put(convertAlias(alias), entry);
+
+			} catch (Exception e) {
+				logger.error(e.getMessage());
+				throw new KeyStoreException(e.getMessage());
+			}
+		}
+		synchronized (keyEntries) {
+			try {
+				keyEntries.put(convertAlias(alias), entry);
+			} catch (Exception e) {
+				logger.error(e.getMessage());
+				throw new KeyStoreException(e.getMessage());
+			}
+		}
+	}
+	
     @Override
     public Date engineGetCreationDate(String alias) {
         Object entry = keyEntries.get(convertAlias(alias));
@@ -151,7 +260,6 @@ public class RangerKeyStore extends KeyStoreSpi {
         return date;
     }
 
-
     public void addKeyEntry(String alias, Key key, char[] password, String cipher, int bitLength, String description, int version, String attributes)
             throws KeyStoreException {
         if (logger.isDebugEnabled()) {
@@ -283,48 +391,71 @@ public class RangerKeyStore extends KeyStoreSpi {
 
     @Override
     public void engineStore(OutputStream stream, char[] password)
-            throws IOException, NoSuchAlgorithmException, CertificateException {
-        if (logger.isDebugEnabled()) {
-            logger.debug("==> RangerKeyStore.engineStore()");
-        }
-        synchronized (deltaEntries) {
-            // password is mandatory when storing
-            if (password == null) {
-                throw new IllegalArgumentException("Ranger Master Key can't be null");
-            }
-
-            MessageDigest md = getKeyedMessageDigest(password);
-
-            byte digest[] = md.digest();
-            for (Entry<String, Object> entry : deltaEntries.entrySet()) {
-                ByteArrayOutputStream baos = new ByteArrayOutputStream();
-                DataOutputStream dos = new DataOutputStream(new DigestOutputStream(baos, md));
-
-                ObjectOutputStream oos = null;
-                try {
-
-                    oos = new ObjectOutputStream(dos);
-                    oos.writeObject(((SecretKeyEntry) entry.getValue()).sealedKey);
-
-                    dos.write(digest);
-                    dos.flush();
-                    Long creationDate = ((SecretKeyEntry) entry.getValue()).date.getTime();
-                    SecretKeyEntry secretKey = (SecretKeyEntry) entry.getValue();
-                    XXRangerKeyStore xxRangerKeyStore = mapObjectToEntity(entry.getKey(), creationDate, baos.toByteArray(), 
-                                                                          secretKey.cipher_field, secretKey.bit_length, secretKey.description, 
-                                                                          secretKey.version, secretKey.attributes);
-                    dbOperationStore(xxRangerKeyStore);
-                } finally {
-                    if (oos != null) {
-                        oos.close();
-                    } else {
-                        dos.close();
-                    }
-                }
-            }
-            clearDeltaEntires();
-        }
-    }
+			throws IOException, NoSuchAlgorithmException, CertificateException {
+		if (logger.isDebugEnabled()) {
+			logger.debug("==> RangerKeyStore.engineStore()");
+		}
+		synchronized (deltaEntries) {
+			if (azureKeyVaultEnabled) {
+				for (Entry<String, Object> entry : deltaEntries.entrySet()) {
+					Long creationDate = ((SecretKeyByteEntry) entry.getValue()).date
+							.getTime();
+					SecretKeyByteEntry secretSecureKey = (SecretKeyByteEntry) entry
+							.getValue();
+					XXRangerKeyStore xxRangerKeyStore = mapObjectToEntity(
+							entry.getKey(), creationDate, secretSecureKey.key,
+							secretSecureKey.cipher_field,
+							secretSecureKey.bit_length,
+							secretSecureKey.description,
+							secretSecureKey.version, secretSecureKey.attributes);
+					dbOperationStore(xxRangerKeyStore);
+				}
+
+			} else {
+				// password is mandatory when storing
+				if (password == null) {
+					throw new IllegalArgumentException(
+							"Ranger Master Key can't be null");
+				}
+
+				MessageDigest md = getKeyedMessageDigest(password);
+
+				byte digest[] = md.digest();
+				for (Entry<String, Object> entry : deltaEntries.entrySet()) {
+					ByteArrayOutputStream baos = new ByteArrayOutputStream();
+					DataOutputStream dos = new DataOutputStream(
+							new DigestOutputStream(baos, md));
+
+					ObjectOutputStream oos = null;
+					try {
+
+						oos = new ObjectOutputStream(dos);
+						oos.writeObject(((SecretKeyEntry) entry.getValue()).sealedKey);
+
+						dos.write(digest);
+						dos.flush();
+						Long creationDate = ((SecretKeyEntry) entry.getValue()).date
+								.getTime();
+						SecretKeyEntry secretKey = (SecretKeyEntry) entry
+								.getValue();
+						XXRangerKeyStore xxRangerKeyStore = mapObjectToEntity(
+								entry.getKey(), creationDate,
+								baos.toByteArray(), secretKey.cipher_field,
+								secretKey.bit_length, secretKey.description,
+								secretKey.version, secretKey.attributes);
+						dbOperationStore(xxRangerKeyStore);
+					} finally {
+						if (oos != null) {
+							oos.close();
+						} else {
+							dos.close();
+						}
+					}
+				}
+			}
+			clearDeltaEntires();
+		}
+	}
 
     private XXRangerKeyStore mapObjectToEntity(String alias, Long creationDate,
                                                byte[] byteArray, String cipher_field, int bit_length,
@@ -341,7 +472,7 @@ public class RangerKeyStore extends KeyStoreSpi {
         return xxRangerKeyStore;
     }
 
-    private void dbOperationStore(XXRangerKeyStore rangerKeyStore) {
+    public void dbOperationStore(XXRangerKeyStore rangerKeyStore) {
         if (logger.isDebugEnabled()) {
             logger.debug("==> RangerKeyStore.dbOperationStore()");
         }
@@ -381,104 +512,127 @@ public class RangerKeyStore extends KeyStoreSpi {
 
     @Override
     public void engineLoad(InputStream stream, char[] password)
-            throws IOException, NoSuchAlgorithmException, CertificateException {
-        if (logger.isDebugEnabled()) {
-            logger.debug("==> RangerKeyStore.engineLoad()");
-        }
-        synchronized (keyEntries) {
-            List<XXRangerKeyStore> rangerKeyDetails = dbOperationLoad();
-
-            DataInputStream dis;
-            MessageDigest md = null;
-
-            if (rangerKeyDetails == null || rangerKeyDetails.size() < 1) {
-                if (logger.isDebugEnabled()) {
-                    logger.debug("RangerKeyStore might be null or key is not present in the database.");
-                }
-                return;
-            }
-
-            keyEntries.clear();
-            if (password != null) {
-                md = getKeyedMessageDigest(password);
-            }
-
-            byte computed[] = {};
-            if (md != null) {
-                computed = md.digest();
-            }
-            for (XXRangerKeyStore rangerKey : rangerKeyDetails) {
-
-                String encoded = rangerKey.getEncoded();
-                byte[] data = DatatypeConverter.parseBase64Binary(encoded);
-
-                if (data != null && data.length > 0) {
-                    stream = new ByteArrayInputStream(data);
-                } else {
-                    logger.error("No Key found for alias " + rangerKey.getAlias());
-                }
-
-                if (computed != null) {
-                    int counter = 0;
-                    for (int i = computed.length - 1; i >= 0; i--) {
-                        if (computed[i] != data[data.length - (1 + counter)]) {
-                            Throwable t = new UnrecoverableKeyException
-                                    ("Password verification failed");
-                            logger.error("Keystore was tampered with, or password was incorrect.", t);
-                            throw (IOException) new IOException
-                                    ("Keystore was tampered with, or "
-                                            + "password was incorrect").initCause(t);
-                        } else {
-                            counter++;
-                        }
-                    }
-                }
-
-                if (password != null) {
-                    dis = new DataInputStream(new DigestInputStream(stream, md));
-                } else {
-                    dis = new DataInputStream(stream);
-                }
-
-                ObjectInputStream ois = null;
-                try {
-                    String alias;
-
-                    SecretKeyEntry entry = new SecretKeyEntry();
-
-                    //read the alias
-                    alias = rangerKey.getAlias();
-
-                    //read the (entry creation) date
-                    entry.date = new Date(rangerKey.getCreatedDate());
-                    entry.cipher_field = rangerKey.getCipher();
-                    entry.bit_length = rangerKey.getBitLength();
-                    entry.description = rangerKey.getDescription();
-                    entry.version = rangerKey.getVersion();
-                    entry.attributes = rangerKey.getAttributes();
-                    //read the sealed key
-                    try {
-                        ois = new ObjectInputStream(dis);
-                        entry.sealedKey = (SealedObject) ois.readObject();
-                    } catch (ClassNotFoundException cnfe) {
-                        throw new IOException(cnfe.getMessage());
-                    }
-
-                    //Add the entry to the list
-                    keyEntries.put(alias, entry);
-                } finally {
-                    if (ois != null) {
-                        ois.close();
-                    } else {
-                        dis.close();
-                    }
-                }
-            }
-        }
-    }
+			throws IOException, NoSuchAlgorithmException, CertificateException {
+		if (logger.isDebugEnabled()) {
+			logger.debug("==> RangerKeyStore.engineLoad()");
+		}
+
+		synchronized (keyEntries) {
+			List<XXRangerKeyStore> rangerKeyDetails = dbOperationLoad();
+
+			if (rangerKeyDetails == null || rangerKeyDetails.size() < 1) {
+				if (logger.isDebugEnabled()) {
+					logger.debug("RangerKeyStore might be null or key is not present in the database.");
+				}
+				return;
+			}
+
+			keyEntries.clear();
+			if (azureKeyVaultEnabled) {
+				for (XXRangerKeyStore rangerKey : rangerKeyDetails) {
+					String encodedStr = rangerKey.getEncoded();
+					byte[] encodedByte = DatatypeConverter
+							.parseBase64Binary(encodedStr);
+					String alias;
+					SecretKeyByteEntry entry = new SecretKeyByteEntry();
+					alias = rangerKey.getAlias();
+					entry.date = new Date(rangerKey.getCreatedDate());
+					entry.cipher_field = rangerKey.getCipher();
+					entry.bit_length = rangerKey.getBitLength();
+					entry.description = rangerKey.getDescription();
+					entry.version = rangerKey.getVersion();
+					entry.attributes = rangerKey.getAttributes();
+					entry.key = encodedByte;
+					keyEntries.put(alias, entry);
+				}
+			} else {
+				DataInputStream dis;
+				MessageDigest md = null;
+				if (password != null) {
+					md = getKeyedMessageDigest(password);
+				}
+
+				byte computed[] = {};
+				if (md != null) {
+					computed = md.digest();
+				}
+				for (XXRangerKeyStore rangerKey : rangerKeyDetails) {
+
+					String encoded = rangerKey.getEncoded();
+					byte[] data = DatatypeConverter.parseBase64Binary(encoded);
+
+					if (data != null && data.length > 0) {
+						stream = new ByteArrayInputStream(data);
+					} else {
+						logger.error("No Key found for alias "
+								+ rangerKey.getAlias());
+					}
+
+					if (computed != null) {
+						int counter = 0;
+						for (int i = computed.length - 1; i >= 0; i--) {
+							if (computed[i] != data[data.length - (1 + counter)]) {
+								Throwable t = new UnrecoverableKeyException(
+										"Password verification failed");
+								logger.error(
+										"Keystore was tampered with, or password was incorrect.",
+										t);
+								throw (IOException) new IOException(
+										"Keystore was tampered with, or "
+												+ "password was incorrect")
+										.initCause(t);
+							} else {
+								counter++;
+							}
+						}
+					}
+
+					if (password != null) {
+						dis = new DataInputStream(new DigestInputStream(stream,
+								md));
+					} else {
+						dis = new DataInputStream(stream);
+					}
+
+					ObjectInputStream ois = null;
+					try {
+						String alias;
+
+						SecretKeyEntry entry = new SecretKeyEntry();
+
+						// read the alias
+						alias = rangerKey.getAlias();
+
+						// read the (entry creation) date
+						entry.date = new Date(rangerKey.getCreatedDate());
+						entry.cipher_field = rangerKey.getCipher();
+						entry.bit_length = rangerKey.getBitLength();
+						entry.description = rangerKey.getDescription();
+						entry.version = rangerKey.getVersion();
+						entry.attributes = rangerKey.getAttributes();
+						// read the sealed key
+						try {
+							ois = new ObjectInputStream(dis);
+							entry.sealedKey = (SealedObject) ois.readObject();
+						} catch (ClassNotFoundException cnfe) {
+							throw new IOException(cnfe.getMessage());
+						}
+						// Add the entry to the list
+						keyEntries.put(alias, entry);
+					} finally {
+						if (ois != null) {
+							ois.close();
+						} else {
+							dis.close();
+						}
+					}
+				}
+			}
+		}
+	}
 
     private List<XXRangerKeyStore> dbOperationLoad() throws IOException {
-        if (logger.isDebugEnabled()) {
+    	if (logger.isDebugEnabled()) {
             logger.debug("==> RangerKeyStore.dbOperationLoad()");
         }
         try {
@@ -571,76 +725,151 @@ public class RangerKeyStore extends KeyStoreSpi {
 
     public void engineLoadKeyStoreFile(InputStream stream, char[] storePass,
                                        char[] keyPass, char[] masterKey, String fileFormat)
-            throws IOException, NoSuchAlgorithmException, CertificateException {
-        if (logger.isDebugEnabled()) {
-            logger.debug("==> RangerKeyStoreProvider.engineLoadKeyStoreFile()");
-        }
-        synchronized (deltaEntries) {
-            KeyStore ks;
-            try {
-                ks = KeyStore.getInstance(fileFormat);
-                ks.load(stream, storePass);
-                deltaEntries.clear();
-                for (Enumeration<String> name = ks.aliases(); name
-                        .hasMoreElements(); ) {
-                    SecretKeyEntry entry = new SecretKeyEntry();
-                    String alias = (String) name.nextElement();
-                    Key k = ks.getKey(alias, keyPass);
-
-                    if (k instanceof JavaKeyStoreProvider.KeyMetadata) {
-                        JavaKeyStoreProvider.KeyMetadata keyMetadata = (JavaKeyStoreProvider.KeyMetadata) k;
-                        Field f = JavaKeyStoreProvider.KeyMetadata.class
-                                .getDeclaredField(METADATA_FIELDNAME);
-                        f.setAccessible(true);
-                        Metadata metadata = (Metadata) f.get(keyMetadata);
-                        entry.bit_length = metadata.getBitLength();
-                        entry.cipher_field = metadata.getAlgorithm();
-                        Constructor<RangerKeyStoreProvider.KeyMetadata> constructor = RangerKeyStoreProvider.KeyMetadata.class
-                                .getDeclaredConstructor(Metadata.class);
-                        constructor.setAccessible(true);
-                        RangerKeyStoreProvider.KeyMetadata nk = constructor
-                                .newInstance(metadata);
-                        k = nk;
-                    } else {
-                        entry.bit_length = (k.getEncoded().length * NUMBER_OF_BITS_PER_BYTE);
-                        entry.cipher_field = k.getAlgorithm();
-                    }
-                    String keyName = alias.split("@")[0];
-                    validateKeyName(keyName);
-                    entry.attributes = "{\"key.acl.name\":\"" + keyName + "\"}";
-                    Class<?> c = null;
-                    Object o = null;
-                    try {
-                        c = Class
-                                .forName("com.sun.crypto.provider.KeyProtector");
-                        Constructor<?> constructor = c
-                                .getDeclaredConstructor(char[].class);
-                        constructor.setAccessible(true);
-                        o = constructor.newInstance(masterKey);
-                        // seal and store the key
-                        Method m = c.getDeclaredMethod("seal", Key.class);
-                        m.setAccessible(true);
-                        entry.sealedKey = (SealedObject) m.invoke(o, k);
-                    } catch (ClassNotFoundException | NoSuchMethodException
-                            | SecurityException | InstantiationException
-                            | IllegalAccessException | IllegalArgumentException
-                            | InvocationTargetException e) {
-                        logger.error(e.getMessage());
-                        throw new IOException(e.getMessage());
-                    }
-
-                    entry.date = ks.getCreationDate(alias);
-                    entry.version = (alias.split("@").length == 2) ? (Integer
-                            .parseInt(alias.split("@")[1])) : 0;
-                    entry.description = k.getFormat() + " - " + ks.getType();
-                    deltaEntries.put(alias, entry);
-                }
-            } catch (Throwable t) {
-                logger.error("Unable to load keystore file ", t);
-                throw new IOException(t);
-            }
-        }
-    }
+			throws IOException, NoSuchAlgorithmException, CertificateException {
+		if (logger.isDebugEnabled()) {
+			logger.debug("==> RangerKeyStoreProvider.engineLoadKeyStoreFile()");
+		}
+		synchronized (deltaEntries) {
+			KeyStore ks;
+			if (azureKeyVaultEnabled) {
+				try {
+					ks = KeyStore.getInstance(fileFormat);
+					ks.load(stream, storePass);
+					deltaEntries.clear();
+					for (Enumeration<String> name = ks.aliases(); name
+							.hasMoreElements();) {
+						SecretKeyByteEntry entry = new SecretKeyByteEntry();
+						String alias = (String) name.nextElement();
+						Key k = ks.getKey(alias, keyPass);
+						SecretKey secretKey = null;
+						if (k instanceof JavaKeyStoreProvider.KeyMetadata) {
+							JavaKeyStoreProvider.KeyMetadata keyMetadata = (JavaKeyStoreProvider.KeyMetadata) k;
+							Field f = JavaKeyStoreProvider.KeyMetadata.class
+									.getDeclaredField(METADATA_FIELDNAME);
+							f.setAccessible(true);
+							Metadata metadata = (Metadata) f.get(keyMetadata);
+							entry.bit_length = metadata.getBitLength();
+							entry.cipher_field = metadata.getAlgorithm();
+							entry.version = (alias.split("@").length == 2) ? (Integer
+									.parseInt(alias.split("@")[1])) : 0;
+							Constructor<RangerKeyStoreProvider.KeyMetadata> constructor = RangerKeyStoreProvider.KeyMetadata.class
+									.getDeclaredConstructor(Metadata.class);
+							constructor.setAccessible(true);
+							RangerKeyStoreProvider.KeyMetadata nk = constructor
+									.newInstance(metadata);
+							k = nk;
+							secretKey = new SecretKeySpec(k.getEncoded(),
+									getAlgorithm(metadata.getAlgorithm()));
+						} else if (k instanceof KeyByteMetadata) {
+							Metadata m = ((KeyByteMetadata) k).metadata;
+							byte[] encodedKey = ((KeyByteMetadata) k)
+									.getEncoded();
+							entry.cipher_field = m.getCipher();
+							entry.version = m.getVersions();
+							entry.bit_length = m.getBitLength();
+							if (encodedKey != null && encodedKey.length > 0) {
+								secretKey = new SecretKeySpec(encodedKey,
+										m.getAlgorithm());
+							}
+						} else {
+							entry.bit_length = (k.getEncoded().length * NUMBER_OF_BITS_PER_BYTE);
+							entry.cipher_field = k.getAlgorithm();
+							if (alias.split("@").length == 2) {
+								entry.version = Integer.parseInt(alias
+										.split("@")[1]) + 1;
+							} else {
+								entry.version = 1;
+							}
+							
+							if(k.getEncoded() != null && k.getEncoded().length > 0){
+								secretKey = new SecretKeySpec(k.getEncoded(),
+										getAlgorithm(k.getAlgorithm()));
+							}
+						}
+
+						String keyName = alias.split("@")[0];
+						validateKeyName(keyName);
+						entry.attributes = "{\"key.acl.name\":\"" + keyName
+								+ "\"}";
+						entry.key = kvKeyGen.encryptZoneKey(secretKey);
+						entry.date = ks.getCreationDate(alias);
+						entry.description = k.getFormat() + " - "
+								+ ks.getType();
+						deltaEntries.put(alias, entry);
+					}
+				} catch (Exception t) {
+					logger.error("Unable to load keystore file ", t);
+					throw new IOException(t);
+				}
+			} else {
+				try {
+					ks = KeyStore.getInstance(fileFormat);
+					ks.load(stream, storePass);
+					deltaEntries.clear();
+					for (Enumeration<String> name = ks.aliases(); name
+							.hasMoreElements();) {
+						SecretKeyEntry entry = new SecretKeyEntry();
+						String alias = (String) name.nextElement();
+						Key k = ks.getKey(alias, keyPass);
+
+						if (k instanceof JavaKeyStoreProvider.KeyMetadata) {
+							JavaKeyStoreProvider.KeyMetadata keyMetadata = (JavaKeyStoreProvider.KeyMetadata) k;
+							Field f = JavaKeyStoreProvider.KeyMetadata.class
+									.getDeclaredField(METADATA_FIELDNAME);
+							f.setAccessible(true);
+							Metadata metadata = (Metadata) f.get(keyMetadata);
+							entry.bit_length = metadata.getBitLength();
+							entry.cipher_field = metadata.getAlgorithm();
+							Constructor<RangerKeyStoreProvider.KeyMetadata> constructor = RangerKeyStoreProvider.KeyMetadata.class
+									.getDeclaredConstructor(Metadata.class);
+							constructor.setAccessible(true);
+							RangerKeyStoreProvider.KeyMetadata nk = constructor
+									.newInstance(metadata);
+							k = nk;
+						} else {
+							entry.bit_length = (k.getEncoded().length * NUMBER_OF_BITS_PER_BYTE);
+							entry.cipher_field = k.getAlgorithm();
+						}
+						String keyName = alias.split("@")[0];
+						validateKeyName(keyName);
+						entry.attributes = "{\"key.acl.name\":\"" + keyName
+								+ "\"}";
+						Class<?> c = null;
+						Object o = null;
+						try {
+							c = Class
+									.forName("com.sun.crypto.provider.KeyProtector");
+							Constructor<?> constructor = c
+									.getDeclaredConstructor(char[].class);
+							constructor.setAccessible(true);
+							o = constructor.newInstance(masterKey);
+							// seal and store the key
+							Method m = c.getDeclaredMethod("seal", Key.class);
+							m.setAccessible(true);
+							entry.sealedKey = (SealedObject) m.invoke(o, k);
+						} catch (ClassNotFoundException | NoSuchMethodException
+								| SecurityException | InstantiationException
+								| IllegalAccessException
+								| IllegalArgumentException
+								| InvocationTargetException e) {
+							logger.error(e.getMessage());
+							throw new IOException(e.getMessage());
+						}
+
+						entry.date = ks.getCreationDate(alias);
+						entry.version = (alias.split("@").length == 2) ? (Integer
+								.parseInt(alias.split("@")[1])) : 0;
+						entry.description = k.getFormat() + " - "
+								+ ks.getType();
+						deltaEntries.put(alias, entry);
+					}
+				} catch (Throwable t) {
+					logger.error("Unable to load keystore file ", t);
+					throw new IOException(t);
+				}
+			}
+		}
+	}
 
     public void engineLoadToKeyStoreFile(OutputStream stream, char[] storePass,
                                          char[] keyPass, char[] masterKey, String fileFormat)
@@ -661,8 +890,13 @@ public class RangerKeyStore extends KeyStoreSpi {
                     Key key;
                     while (e.hasMoreElements()) {
                         alias = e.nextElement();
-                        key = engineGetKey(alias, masterKey);
+                        if(azureKeyVaultEnabled){
+                        	key = engineGetDecryptedZoneKey(alias);
+                        }else{
+                        	key = engineGetKey(alias, masterKey);
+                        }
                         ks.setKeyEntry(alias, key, keyPass, null);
+                        
                     }
                     ks.store(stream, storePass);
                 }
@@ -686,6 +920,60 @@ public class RangerKeyStore extends KeyStoreSpi {
     public void clearDeltaEntires() {
         deltaEntries.clear();
     }
+    
+    private Object getKeyEntry(String alias) {
+    	   	return keyEntries.get(alias);
+    }
+
+	public XXRangerKeyStore convertKeysBetweenRangerKMSAndAzureKeyVault(
+			String alias, Key key,
+			RangerKeyVaultKeyGenerator rangerKVKeyGenerator) {
+		try {
+			XXRangerKeyStore xxRangerKeyStore;
+			SecretKeyEntry secretKey = (SecretKeyEntry) getKeyEntry(alias);
+			if (key instanceof KeyMetadata) {
+				Metadata meta = ((KeyMetadata) key).metadata;
+				KeyGenerator keyGenerator = KeyGenerator
+						.getInstance(getAlgorithm(meta.getCipher()));
+				keyGenerator.init(meta.getBitLength());
+				byte[] keyByte = keyGenerator.generateKey().getEncoded();
+				Key ezkey = new SecretKeySpec(keyByte,
+						getAlgorithm(meta.getCipher()));
+				byte[] encryptedKey = rangerKVKeyGenerator
+						.encryptZoneKey(ezkey);
+				Long creationDate = new Date().getTime();
+				String attributes = secretKey.attributes;
+				xxRangerKeyStore = mapObjectToEntity(alias, creationDate,
+						encryptedKey, meta.getCipher(), meta.getBitLength(),
+						meta.getDescription(), meta.getVersions(),
+						attributes);
+			} else {
+				byte[] encryptedKey = rangerKVKeyGenerator.encryptZoneKey(key);
+				Long creationDate = secretKey.date.getTime();
+				int version = secretKey.version;
+				if ((alias.split("@").length == 2)
+						&& (((Integer.parseInt(alias.split("@")[1])) + 1) != secretKey.version)) {
+					version++;
+				}
+				xxRangerKeyStore = mapObjectToEntity(alias, creationDate,
+						encryptedKey, secretKey.cipher_field,
+						secretKey.bit_length, secretKey.description, version,
+						secretKey.attributes);
+			}
+			return xxRangerKeyStore;
+		} catch (Throwable t) {
+			throw new RuntimeException(
+					"Migration failed between key secure and Ranger DB : ", t);
+		}
+	}
+
+	public String getAlgorithm(String cipher) {
+		int slash = cipher.indexOf(47);
+		if (slash == -1) {
+			return cipher;
+		}
+		return cipher.substring(0, slash);
+	}
 
     /**
      * Encapsulate the encrypted key, so that we can retrieve the AlgorithmParameters object on the decryption side
@@ -712,4 +1000,50 @@ public class RangerKeyStore extends KeyStoreSpi {
         }
 
     }
+    
+    public static class KeyByteMetadata implements Key, Serializable {
+        private Metadata metadata;
+        private byte[] keyByte;
+        
+        private final static long serialVersionUID = 8405872419967874451L;
+
+        private KeyByteMetadata(Metadata meta, byte[] encoded) {
+            this.metadata = meta;
+            this.keyByte = encoded;
+        }
+
+        @Override
+        public String getAlgorithm() {
+            return metadata.getCipher();
+        }
+
+        @Override
+        public String getFormat() {
+            return KEY_METADATA;
+        }
+
+        @Override
+        public byte[] getEncoded() {
+            return this.keyByte;
+        }
+
+        private void writeObject(ObjectOutputStream out) throws IOException {
+        	byte[] serialized = metadata.serialize();
+            
+            out.writeInt(serialized.length);
+            out.write(serialized);
+            out.writeInt(keyByte.length);
+            out.write(keyByte);
+        }
+
+        private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+        	  byte[] metadataBuf = new byte[in.readInt()];
+              in.readFully(metadataBuf);
+              metadata = new Metadata(metadataBuf);
+              byte[] keybyteBuf = new byte[in.readInt()];
+              in.readFully(keybyteBuf);
+              keyByte = keybyteBuf;
+        }
+
+    }
 }
diff --git a/kms/src/main/java/org/apache/hadoop/crypto/key/RangerKeyStoreProvider.java b/kms/src/main/java/org/apache/hadoop/crypto/key/RangerKeyStoreProvider.java
index b280cbf..1792bc4 100755
--- a/kms/src/main/java/org/apache/hadoop/crypto/key/RangerKeyStoreProvider.java
+++ b/kms/src/main/java/org/apache/hadoop/crypto/key/RangerKeyStoreProvider.java
@@ -17,6 +17,7 @@
 
 package org.apache.hadoop.crypto.key;
 
+import com.microsoft.azure.keyvault.KeyVaultClient;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
@@ -35,9 +36,11 @@ import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import javax.crypto.KeyGenerator;
 import javax.crypto.spec.SecretKeySpec;
-
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
@@ -47,458 +50,660 @@ import org.apache.ranger.credentialapi.CredentialReader;
 import org.apache.ranger.kms.dao.DaoManager;
 import org.apache.log4j.Logger;
 
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 @InterfaceAudience.Private
 public class RangerKeyStoreProvider extends KeyProvider {
 
-    static final Logger logger = Logger.getLogger(RangerKeyStoreProvider.class);
-
-    public static final String SCHEME_NAME = "dbks";
-    public static final String KMS_CONFIG_DIR = "kms.config.dir";
-    public static final String DBKS_SITE_XML = "dbks-site.xml";
-    public static final String ENCRYPTION_KEY = "ranger.db.encrypt.key.password";
-    private static final String KEY_METADATA = "KeyMetadata";
-    private static final String CREDENTIAL_PATH = "ranger.ks.jpa.jdbc.credential.provider.path";
-    private static final String MK_CREDENTIAL_ALIAS = "ranger.ks.masterkey.credential.alias";
-    private static final String DB_CREDENTIAL_ALIAS = "ranger.ks.jpa.jdbc.credential.alias";
-    private static final String DB_PASSWORD = "ranger.ks.jpa.jdbc.password";
-    private static final String HSM_ENABLED = "ranger.ks.hsm.enabled";
-    private static final String HSM_PARTITION_PASSWORD_ALIAS = "ranger.ks.hsm.partition.password.alias";
-    private static final String HSM_PARTITION_PASSWORD = "ranger.ks.hsm.partition.password";
-    private static final String KEYSECURE_ENABLED = "ranger.kms.keysecure.enabled";
-
-        private static final String KEYSECURE_USERNAME = "ranger.kms.keysecure.login.username";
-        private static final String KEYSECURE_PASSWORD_ALIAS = "ranger.kms.keysecure.login.password.alias";
-    private static final String KEYSECURE_PASSWORD = "ranger.kms.keysecure.login.password";
-    private static final String KEYSECURE_LOGIN = "ranger.kms.keysecure.login";
-
-    private final RangerKeyStore dbStore;
-    private char[] masterKey;
-    private boolean changed = false;
-    private final Map<String, Metadata> cache = new HashMap<String, Metadata>();
-    private DaoManager daoManager;
-
-    private Lock readLock;
-
-    public RangerKeyStoreProvider(Configuration conf) throws Throwable {
-        super(conf);
-        if (logger.isDebugEnabled()) {
-            logger.debug("==> RangerKeyStoreProvider.Configuration(conf)");
-        }
-        conf = getDBKSConf();
-        getFromJceks(conf, CREDENTIAL_PATH, MK_CREDENTIAL_ALIAS, ENCRYPTION_KEY);
-        getFromJceks(conf, CREDENTIAL_PATH, DB_CREDENTIAL_ALIAS, DB_PASSWORD);
-        getFromJceks(conf, CREDENTIAL_PATH, HSM_PARTITION_PASSWORD_ALIAS, HSM_PARTITION_PASSWORD);
-        RangerKMSDB rangerKMSDB = new RangerKMSDB(conf);
-        daoManager = rangerKMSDB.getDaoManager();
-
-        RangerKMSMKI rangerMasterKey = null;
-        String password = conf.get(ENCRYPTION_KEY);
-        if (password == null || password.trim().equals("") || password.trim().equals("_") || password.trim().equals("crypted")) {
-            throw new IOException("The Ranger MasterKey Password is empty or not a valid Password");
-        }
-        if (StringUtils.isEmpty(conf.get(HSM_ENABLED)) || conf.get(HSM_ENABLED).equalsIgnoreCase("false")) {
-            logger.info("Ranger KMS Database is enabled for storing master key.");
-            rangerMasterKey = new RangerMasterKey(daoManager);
-        } else {
-            logger.info("Ranger KMS HSM is enabled for storing master key.");
-            rangerMasterKey = new RangerHSM(conf);
-            String partitionPasswd = conf.get(HSM_PARTITION_PASSWORD);
-            if (partitionPasswd == null || partitionPasswd.trim().equals("") || partitionPasswd.trim().equals("_") || partitionPasswd.trim().equals("crypted")) {
-                throw new IOException("Partition Password doesn't exists");
-            }
-        }
-
-                if (conf != null && StringUtils.isNotEmpty(conf.get(KEYSECURE_ENABLED))
-                                && conf.get(KEYSECURE_ENABLED).equalsIgnoreCase("true")) {
-                        getFromJceks(conf, CREDENTIAL_PATH, KEYSECURE_PASSWORD_ALIAS, KEYSECURE_PASSWORD);
-                        String keySecureLoginCred = conf.get(KEYSECURE_USERNAME).trim() + ":" + conf.get(KEYSECURE_PASSWORD);
-                        conf.set(KEYSECURE_LOGIN, keySecureLoginCred);
-
-                        rangerMasterKey = new RangerSafenetKeySecure(conf);
-
-                        dbStore = new RangerKeyStore(daoManager);
-                        // generate master key on key secure server
-                        rangerMasterKey.generateMasterKey(password);
-                        try {
-                                masterKey = rangerMasterKey.getMasterKey(password)
-                                                .toCharArray();
-                        } catch (Exception ex) {
-                                throw new Exception("Error while getting Safenet KeySecure master key " + ex);
-                        }
-
-                } else {
-                        dbStore = new RangerKeyStore(daoManager);
-                        rangerMasterKey.generateMasterKey(password);
-                        // code to retrieve rangerMasterKey password
-                        try {
-                                masterKey = rangerMasterKey.getMasterKey(password)
-                                                .toCharArray();
-                        } catch (Exception ex) {
-                                throw new Exception("Error while getting Ranger Master key " + ex);
-                        }
-                }
-        reloadKeys();
-        ReadWriteLock lock = new ReentrantReadWriteLock(true);
-        readLock = lock.readLock();
-    }
-
-    public static Configuration getDBKSConf() {
-        Configuration newConfig = getConfiguration(true, DBKS_SITE_XML);
-        getFromJceks(newConfig, CREDENTIAL_PATH, MK_CREDENTIAL_ALIAS, ENCRYPTION_KEY);
-        getFromJceks(newConfig, CREDENTIAL_PATH, DB_CREDENTIAL_ALIAS, DB_PASSWORD);
-        return newConfig;
-
-    }
-
-    static Configuration getConfiguration(boolean loadHadoopDefaults,
-                                          String... resources) {
-        Configuration conf = new Configuration(loadHadoopDefaults);
-        String confDir = System.getProperty(KMS_CONFIG_DIR);
-        if (confDir != null) {
-            try {
-                Path confPath = new Path(confDir);
-                if (!confPath.isUriPathAbsolute()) {
-                    throw new RuntimeException("System property '" + KMS_CONFIG_DIR +
-                            "' must be an absolute path: " + confDir);
-                }
-                for (String resource : resources) {
-                    conf.addResource(new URL("file://" + new Path(confDir, resource).toUri()));
-                }
-            } catch (MalformedURLException ex) {
-                logger.error("==> RangerKeyStoreProvider.getConfiguration() error : ", ex);
-                throw new RuntimeException(ex);
-            }
-        } else {
-            for (String resource : resources) {
-                conf.addResource(resource);
-            }
-        }
-        return conf;
-    }
-
-    private void loadKeys(char[] masterKey) throws NoSuchAlgorithmException, CertificateException, IOException {
-        if (logger.isDebugEnabled()) {
-            logger.debug("==> RangerKeyStoreProvider.loadKeys()");
-        }
-        dbStore.engineLoad(null, masterKey);
-    }
-
-    @Override
-    public KeyVersion createKey(String name, byte[] material, Options options)
-            throws IOException {
-        if (logger.isDebugEnabled()) {
-            logger.debug("==> RangerKeyStoreProvider.createKey()");
-        }
-        reloadKeys();
-        if (dbStore.engineContainsAlias(name) || cache.containsKey(name)) {
-            throw new IOException("Key " + name + " already exists");
-        }
-        Metadata meta = new Metadata(options.getCipher(), options.getBitLength(),
-                options.getDescription(), options.getAttributes(), new Date(), 1);
-        if (options.getBitLength() != 8 * material.length) {
-            throw new IOException("Wrong key length. Required " +
-                    options.getBitLength() + ", but got " + (8 * material.length));
-        }
-        cache.put(name, meta);
-        String versionName = buildVersionName(name, 0);
-        if (logger.isDebugEnabled()) {
-            logger.debug("<== RangerKeyStoreProvider.createKey()");
-        }
-        return innerSetKeyVersion(name, versionName, material, meta.getCipher(), meta.getBitLength(), meta.getDescription(), meta.getVersions(), meta.getAttributes());
-    }
-
-    KeyVersion innerSetKeyVersion(String name, String versionName, byte[] material, String cipher, int bitLength, String description, int version, Map<String, String> attributes) throws IOException {
-        if (logger.isDebugEnabled()) {
-            logger.debug("==> RangerKeyStoreProvider.innerSetKeyVersion()");
-            logger.debug("name : " + name + " and versionName : " + versionName);
-        }
-        try {
-            String attribute = JsonUtilsV2.mapToJson(attributes);
-            dbStore.addKeyEntry(versionName, new SecretKeySpec(material, cipher), masterKey, cipher, bitLength, description, version, attribute);
-        } catch (Exception e) {
-            throw new IOException("Can't store key " + versionName, e);
-        }
-        changed = true;
-        if (logger.isDebugEnabled()) {
-            logger.debug("<== RangerKeyStoreProvider.innerSetKeyVersion()");
-        }
-        return new KeyVersion(name, versionName, material);
-    }
-
-    @Override
-    public void deleteKey(String name) throws IOException {
-        if (logger.isDebugEnabled()) {
-            logger.debug("==> RangerKeyStoreProvider.deleteKey(" + name + ")");
-        }
-        reloadKeys();
-        Metadata meta = getMetadata(name);
-        if (meta == null) {
-            throw new IOException("Key " + name + " does not exist");
-        }
-        for (int v = 0; v < meta.getVersions(); ++v) {
-            String versionName = buildVersionName(name, v);
-            try {
-                if (dbStore.engineContainsAlias(versionName)) {
-                    dbStore.engineDeleteEntry(versionName);
-                }
-            } catch (KeyStoreException e) {
-                throw new IOException("Problem removing " + versionName, e);
-            }
-        }
-        try {
-            if (dbStore.engineContainsAlias(name)) {
-                dbStore.engineDeleteEntry(name);
-            }
-        } catch (KeyStoreException e) {
-            throw new IOException("Problem removing " + name + " from " + this, e);
-        }
-        cache.remove(name);
-        changed = true;
-    }
-
-    @Override
-    public void flush() throws IOException {
-        try {
-            if (!changed) {
-                return;
-            }
-            // put all of the updates into the db
-            for (Map.Entry<String, Metadata> entry : cache.entrySet()) {
-                try {
-                    Metadata metadata = entry.getValue();
-                    String attributes = JsonUtilsV2.mapToJson(metadata.getAttributes());
-                    dbStore.addKeyEntry(entry.getKey(), new KeyMetadata(metadata), masterKey, metadata.getAlgorithm(), metadata.getBitLength(), metadata.getDescription(), metadata.getVersions(), attributes);
-                } catch (Exception e) {
-                    throw new IOException("Can't set metadata key " + entry.getKey(), e);
-                }
-            }
-            try {
-                dbStore.engineStore(null, masterKey);
-                reloadKeys();
-            } catch (NoSuchAlgorithmException e) {
-                throw new IOException("No such algorithm storing key", e);
-            } catch (CertificateException e) {
-                throw new IOException("Certificate exception storing key", e);
-            }
-            changed = false;
-        } catch (IOException ioe) {
-            cache.clear();
-            reloadKeys();
-            throw ioe;
-        }
-    }
-
-    @Override
-    public KeyVersion getKeyVersion(String versionName) throws IOException {
-        readLock.lock();
-        try {
-            SecretKeySpec key = null;
-            try {
-                if (!dbStore.engineContainsAlias(versionName)) {
-                    dbStore.engineLoad(null, masterKey);
-                    if (!dbStore.engineContainsAlias(versionName)) {
-                        return null;
-                    }
-                }
-                key = (SecretKeySpec) dbStore.engineGetKey(versionName, masterKey);
-            } catch (NoSuchAlgorithmException e) {
-                throw new IOException("Can't get algorithm for key " + key, e);
-            } catch (UnrecoverableKeyException e) {
-                throw new IOException("Can't recover key " + key, e);
-            } catch (CertificateException e) {
-                throw new IOException("Certificate exception storing key", e);
-            }
-            if (key == null) {
-                return null;
-            } else {
-                return new KeyVersion(getBaseName(versionName), versionName, key.getEncoded());
-            }
-        } finally {
-            readLock.unlock();
-        }
-    }
-
-    @Override
-    public List<KeyVersion> getKeyVersions(String name) throws IOException {
-        List<KeyVersion> list = new ArrayList<KeyVersion>();
-        Metadata km = getMetadata(name);
-        if (km != null) {
-            int latestVersion = km.getVersions();
-            KeyVersion v = null;
-            String versionName = null;
-            for (int i = 0; i < latestVersion; i++) {
-                versionName = buildVersionName(name, i);
-                v = getKeyVersion(versionName);
-                if (v != null) {
-                    list.add(v);
-                }
-            }
-        }
-        return list;
-    }
-
-    @Override
-    public List<String> getKeys() throws IOException {
-        ArrayList<String> list = new ArrayList<String>();
-        String alias = null;
-        reloadKeys();
-        Enumeration<String> e = dbStore.engineAliases();
-        while (e.hasMoreElements()) {
-            alias = e.nextElement();
-            // only include the metadata key names in the list of names
-            if (!alias.contains("@")) {
-                list.add(alias);
-            }
-        }
-        return list;
-    }
-
-    @Override
-    public Metadata getMetadata(String name) throws IOException {
-        try {
-            readLock.lock();
-            if (cache.containsKey(name)) {
-                Metadata meta = cache.get(name);
-                return meta;
-            }
-            try {
-                if (!dbStore.engineContainsAlias(name)) {
-                    dbStore.engineLoad(null, masterKey);
-                    if (!dbStore.engineContainsAlias(name)) {
-                        return null;
-                    }
-                }
-                Key key = dbStore.engineGetKey(name, masterKey);
-                if (key != null) {
-                    Metadata meta = ((KeyMetadata) key).metadata;
-                    cache.put(name, meta);
-                    return meta;
-                }
-            } catch (NoSuchAlgorithmException e) {
-                throw new IOException("Can't get algorithm for " + name, e);
-            } catch (UnrecoverableKeyException e) {
-                throw new IOException("Can't recover key for " + name, e);
-            }
-            return null;
-        } catch (Exception e) {
-            throw new IOException("Please try again ", e);
-        } finally {
-            readLock.unlock();
-        }
-    }
-
-    @Override
-    public KeyVersion rollNewVersion(String name, byte[] material) throws IOException {
-        if (logger.isDebugEnabled()) {
-            logger.debug("==> RangerKeyStoreProvider.rollNewVersion()");
-        }
-        reloadKeys();
-        Metadata meta = getMetadata(name);
-        if (meta == null) {
-            throw new IOException("Key " + name + " not found");
-        }
-        if (meta.getBitLength() != 8 * material.length) {
-            throw new IOException("Wrong key length. Required " + meta.getBitLength() + ", but got " + (8 * material.length));
-        }
-        int nextVersion = meta.addVersion();
-        String versionName = buildVersionName(name, nextVersion);
-        return innerSetKeyVersion(name, versionName, material, meta.getCipher(), meta.getBitLength(), meta.getDescription(), meta.getVersions(), meta.getAttributes());
-    }
-
-    private static void getFromJceks(Configuration conf, String path, String alias, String key) {
-        if (logger.isDebugEnabled()) {
-            logger.debug("==> RangerKeyStoreProvider.getFromJceks()");
-        }
-        //update credential from keystore
-        if (conf != null) {
-            String pathValue = conf.get(path);
-            String aliasValue = conf.get(alias);
-            if (pathValue != null && aliasValue != null) {
-                String xaDBPassword = CredentialReader.getDecryptedString(pathValue.trim(), aliasValue.trim());
-                if (xaDBPassword != null && !xaDBPassword.trim().isEmpty() &&
-                        !xaDBPassword.trim().equalsIgnoreCase("none")) {
-                    conf.set(key, xaDBPassword);
-                } else {
-                    logger.info("Credential keystore password not applied for KMS; clear text password shall be applicable");
-                }
-            }
-        }
-    }
-
-    private void reloadKeys() throws IOException {
-        if (logger.isDebugEnabled()) {
-            logger.debug("==> RangerKeyStoreProvider.reloadKeys()");
-        }
-        try {
-            cache.clear();
-            loadKeys(masterKey);
-        } catch (NoSuchAlgorithmException e) {
-            throw new IOException("Can't load Keys");
-        } catch (CertificateException e) {
-            throw new IOException("Can't load Keys");
-        }
-    }
-
-    /**
-     * The factory to create JksProviders, which is used by the ServiceLoader.
-     */
-    public static class Factory extends KeyProviderFactory {
-        @Override
-        public KeyProvider createProvider(URI providerName,
-                                          Configuration conf) throws IOException {
-            try {
-                if (SCHEME_NAME.equals(providerName.getScheme())) {
-                    return new RangerKeyStoreProvider(conf);
-                }
-            } catch (Throwable e) {
-                logger.error("==> RangerKeyStoreProvider.reloadKeys() error : " , e);
-            }
-            return null;
-        }
-    }
-
-    /**
-     * An adapter between a KeyStore Key and our Metadata. This is used to store
-     * the metadata in a KeyStore even though isn't really a key.
-     */
-    public static class KeyMetadata implements Key, Serializable {
-        private Metadata metadata;
-        private final static long serialVersionUID = 8405872419967874451L;
-
-        private KeyMetadata(Metadata meta) {
-            this.metadata = meta;
-        }
-
-        @Override
-        public String getAlgorithm() {
-            return metadata.getCipher();
-        }
-
-        @Override
-        public String getFormat() {
-            return KEY_METADATA;
-        }
-
-        @Override
-        public byte[] getEncoded() {
-            return new byte[0];
-        }
-
-        private void writeObject(ObjectOutputStream out) throws IOException {
-            byte[] serialized = metadata.serialize();
-            out.writeInt(serialized.length);
-            out.write(serialized);
-        }
-
-        private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-            byte[] buf = new byte[in.readInt()];
-            in.readFully(buf);
-            metadata = new Metadata(buf);
-        }
-
-    }
+	static final Logger logger = Logger.getLogger(RangerKeyStoreProvider.class);
+
+	public static final String SCHEME_NAME = "dbks";
+	public static final String KMS_CONFIG_DIR = "kms.config.dir";
+	public static final String DBKS_SITE_XML = "dbks-site.xml";
+	public static final String ENCRYPTION_KEY = "ranger.db.encrypt.key.password";
+	private static final String KEY_METADATA = "KeyMetadata";
+	private static final String CREDENTIAL_PATH = "ranger.ks.jpa.jdbc.credential.provider.path";
+	private static final String MK_CREDENTIAL_ALIAS = "ranger.ks.masterkey.credential.alias";
+	private static final String DB_CREDENTIAL_ALIAS = "ranger.ks.jpa.jdbc.credential.alias";
+	private static final String DB_PASSWORD = "ranger.ks.jpa.jdbc.password";
+	private static final String HSM_ENABLED = "ranger.ks.hsm.enabled";
+	private static final String HSM_PARTITION_PASSWORD_ALIAS = "ranger.ks.hsm.partition.password.alias";
+	private static final String HSM_PARTITION_PASSWORD = "ranger.ks.hsm.partition.password";
+	private static final String KEYSECURE_ENABLED = "ranger.kms.keysecure.enabled";
+	private static final String KEYSECURE_USERNAME = "ranger.kms.keysecure.login.username";
+	private static final String KEYSECURE_PASSWORD_ALIAS = "ranger.kms.keysecure.login.password.alias";
+	private static final String KEYSECURE_PASSWORD = "ranger.kms.keysecure.login.password";
+	private static final String KEYSECURE_LOGIN = "ranger.kms.keysecure.login";
+	private static final String AZURE_KEYVAULT_ENABLED = "ranger.kms.azurekeyvault.enabled";
+	private static final String AZURE_KEYVAULT_SSL_ENABLED = "ranger.kms.azure.keyvault.ssl.enabled";
+	private static final String AZURE_CLIENT_ID = "ranger.kms.azure.client.id";
+	private static final String AZURE_CLIENT_SECRET_ALIAS = "ranger.kms.azure.client.secret.alias";
+	private static final String AZURE_CLIENT_SECRET = "ranger.kms.azure.client.secret";
+	private static final String AZURE_KEYVAULT_CERTIFICATE_PATH = "ranger.kms.azure.keyvault.certificate.path";
+	private static final String AZURE_KEYVAULT_CERTIFICATE_PASSWORD = "ranger.kms.azure.keyvault.certificate.password";
+	private final RangerKeyStore dbStore;
+	private char[] masterKey;
+	private boolean changed = false;
+	private final Map<String, Metadata> cache = new HashMap<String, Metadata>();
+	private DaoManager daoManager;
+	private Lock readLock;
+	private boolean azureKeyVaultEnabled = false;
+
+	public RangerKeyStoreProvider(Configuration conf) throws Throwable {
+		super(conf);
+		if (logger.isDebugEnabled()) {
+			logger.debug("==> RangerKeyStoreProvider.Configuration(conf)");
+		}
+		conf = getDBKSConf();
+		getFromJceks(conf, CREDENTIAL_PATH, MK_CREDENTIAL_ALIAS, ENCRYPTION_KEY);
+		getFromJceks(conf, CREDENTIAL_PATH, DB_CREDENTIAL_ALIAS, DB_PASSWORD);
+		getFromJceks(conf, CREDENTIAL_PATH, HSM_PARTITION_PASSWORD_ALIAS,
+				HSM_PARTITION_PASSWORD);
+		RangerKMSDB rangerKMSDB = new RangerKMSDB(conf);
+		daoManager = rangerKMSDB.getDaoManager();
+
+		RangerKMSMKI rangerMasterKey = null;
+		String password = conf.get(ENCRYPTION_KEY);
+		if (password == null || password.trim().equals("")
+				|| password.trim().equals("_")
+				|| password.trim().equals("crypted")) {
+			throw new IOException(
+					"The Ranger MasterKey Password is empty or not a valid Password");
+		}
+		if (StringUtils.isEmpty(conf.get(HSM_ENABLED))
+				|| conf.get(HSM_ENABLED).equalsIgnoreCase("false")) {
+			logger.info("Ranger KMS Database is enabled for storing master key.");
+			rangerMasterKey = new RangerMasterKey(daoManager);
+		} else {
+			logger.info("Ranger KMS HSM is enabled for storing master key.");
+			rangerMasterKey = new RangerHSM(conf);
+			String partitionPasswd = conf.get(HSM_PARTITION_PASSWORD);
+			if (partitionPasswd == null || partitionPasswd.trim().equals("")
+					|| partitionPasswd.trim().equals("_")
+					|| partitionPasswd.trim().equals("crypted")) {
+				throw new IOException("Partition Password doesn't exists");
+			}
+		}
+
+		if (conf != null && StringUtils.isNotEmpty(conf.get(KEYSECURE_ENABLED))
+				&& conf.get(KEYSECURE_ENABLED).equalsIgnoreCase("true")) {
+			getFromJceks(conf, CREDENTIAL_PATH, KEYSECURE_PASSWORD_ALIAS,
+					KEYSECURE_PASSWORD);
+			String keySecureLoginCred = conf.get(KEYSECURE_USERNAME).trim()
+					+ ":" + conf.get(KEYSECURE_PASSWORD);
+			conf.set(KEYSECURE_LOGIN, keySecureLoginCred);
+
+			rangerMasterKey = new RangerSafenetKeySecure(conf);
+
+			dbStore = new RangerKeyStore(daoManager);
+			// generate master key on key secure server
+			rangerMasterKey.generateMasterKey(password);
+			try {
+				masterKey = rangerMasterKey.getMasterKey(password)
+						.toCharArray();
+			} catch (Exception ex) {
+				throw new Exception(
+						"Error while getting Safenet KeySecure master key "
+								+ ex);
+			}
+
+		} else if (conf != null
+				&& StringUtils.isNotEmpty(conf.get(AZURE_KEYVAULT_ENABLED))
+				&& conf.get(AZURE_KEYVAULT_ENABLED).equalsIgnoreCase("true")) {
+			azureKeyVaultEnabled = true;
+			getFromJceks(conf, CREDENTIAL_PATH, AZURE_CLIENT_SECRET_ALIAS,
+					AZURE_CLIENT_SECRET);
+			String azureClientId = conf.get(AZURE_CLIENT_ID);
+			if (StringUtils.isEmpty(azureClientId)) {
+				throw new Exception(
+						"Azure Key Vault is enabled and client id is not configured");
+			}
+			String azureClientSecret = conf.get(AZURE_CLIENT_SECRET);
+			AzureKeyVaultClientAuthenticator azureKVClientAuthenticator;
+			KeyVaultClient kvClient = null;
+			if (conf != null
+					&& StringUtils.isNotEmpty(conf.get(AZURE_KEYVAULT_SSL_ENABLED))
+					&& conf.get(AZURE_KEYVAULT_SSL_ENABLED).equalsIgnoreCase("false")) {
+				try {
+					if (StringUtils.isEmpty(azureClientSecret)) {
+						throw new Exception(
+								"Azure Key Vault is enabled in non SSL mode and client password/secret is not configured");
+					}
+					azureKVClientAuthenticator = new AzureKeyVaultClientAuthenticator(
+							azureClientId, azureClientSecret);
+					kvClient = new KeyVaultClient(azureKVClientAuthenticator);
+				} catch (Exception ex) {
+					throw new Exception(
+							"Error while getting key vault client object with client id and client secret : "
+									+ ex);
+				}
+			} else {
+				try {
+					azureKVClientAuthenticator = new AzureKeyVaultClientAuthenticator(
+							azureClientId);
+					String keyVaultCertPath = conf
+							.get(AZURE_KEYVAULT_CERTIFICATE_PATH);
+					if (StringUtils.isEmpty(keyVaultCertPath)) {
+						throw new Exception(
+								"Azure Key Vault is enabled in SSL mode. Please provide certificate path for authentication.");
+					}
+					String keyVaultCertPassword = conf
+							.get(AZURE_KEYVAULT_CERTIFICATE_PASSWORD);
+
+					kvClient = !StringUtils.isEmpty(keyVaultCertPassword) ? azureKVClientAuthenticator
+							.getAuthentication(keyVaultCertPath,
+									keyVaultCertPassword)
+							: azureKVClientAuthenticator.getAuthentication(
+									keyVaultCertPath, "");
+				} catch (Exception ex) {
+					throw new Exception(
+							"Error while getting key vault client object with client id and certificate. Error :  : "
+									+ ex);
+				}
+			}
+			boolean success = false;
+			if (kvClient != null) {
+				try {
+					dbStore = new RangerKeyStore(daoManager, conf, kvClient);
+					rangerMasterKey = new RangerKeyVaultKeyGenerator(conf,
+							kvClient);
+					if (rangerMasterKey != null) {
+						success = rangerMasterKey.generateMasterKey(password);
+					}
+				} catch (Exception ex) {
+					throw new Exception(
+							"Error while generating master key and master key secret in Azure key vault. Error :  : "
+									+ ex);
+				}
+			} else {
+				throw new Exception(
+						"Unable to get Key Vault Client. Please check the azure credentials.");
+			}
+			if (success) {
+				try {
+					/* Master key not exportable from key vault */
+					masterKey = null;
+				} catch (Exception ex) {
+					throw new Exception(
+							"Error while getting Azure Master key Secret. Error : "
+									+ ex);
+				}
+			}
+
+		} else {
+			dbStore = new RangerKeyStore(daoManager);
+			rangerMasterKey.generateMasterKey(password);
+			// code to retrieve rangerMasterKey password
+			try {
+				masterKey = rangerMasterKey.getMasterKey(password)
+						.toCharArray();
+			} catch (Exception ex) {
+				throw new Exception("Error while getting Ranger Master key "
+						+ ex);
+			}
+		}
+		reloadKeys();
+		ReadWriteLock lock = new ReentrantReadWriteLock(true);
+		readLock = lock.readLock();
+	}
+
+	public static Configuration getDBKSConf() {
+		Configuration newConfig = getConfiguration(true, DBKS_SITE_XML);
+		getFromJceks(newConfig, CREDENTIAL_PATH, MK_CREDENTIAL_ALIAS,
+				ENCRYPTION_KEY);
+		getFromJceks(newConfig, CREDENTIAL_PATH, DB_CREDENTIAL_ALIAS,
+				DB_PASSWORD);
+		return newConfig;
+
+	}
+
+	static Configuration getConfiguration(boolean loadHadoopDefaults,
+			String... resources) {
+		Configuration conf = new Configuration(loadHadoopDefaults);
+		String confDir = System.getProperty(KMS_CONFIG_DIR);
+		if (confDir != null) {
+			try {
+				Path confPath = new Path(confDir);
+				if (!confPath.isUriPathAbsolute()) {
+					throw new RuntimeException("System property '"
+							+ KMS_CONFIG_DIR + "' must be an absolute path: "
+							+ confDir);
+				}
+				for (String resource : resources) {
+					conf.addResource(new URL("file://"
+							+ new Path(confDir, resource).toUri()));
+				}
+			} catch (MalformedURLException ex) {
+				logger.error(
+						"==> RangerKeyStoreProvider.getConfiguration() error : ",
+						ex);
+				throw new RuntimeException(ex);
+			}
+		} else {
+			for (String resource : resources) {
+				conf.addResource(resource);
+			}
+		}
+		return conf;
+	}
+
+	private void loadKeys(char[] masterKey) throws NoSuchAlgorithmException,
+			CertificateException, IOException {
+		if (logger.isDebugEnabled()) {
+			logger.debug("==> RangerKeyStoreProvider.loadKeys()");
+		}
+		dbStore.engineLoad(null, masterKey);
+	}
+
+	@Override
+	public KeyVersion createKey(String name, byte[] material, Options options)
+			throws IOException {
+		if (logger.isDebugEnabled()) {
+			logger.debug("==> RangerKeyStoreProvider.createKey()");
+		}
+		reloadKeys();
+		if (dbStore.engineContainsAlias(name) || cache.containsKey(name)) {
+			throw new IOException("Key " + name + " already exists");
+		}
+
+		if (dbStore.engineContainsAlias(name) || cache.containsKey(name)) {
+			throw new IOException("Key " + name + " already exists");
+		}
+		Metadata meta = new Metadata(options.getCipher(),
+				options.getBitLength(), options.getDescription(),
+				options.getAttributes(), new Date(), 1);
+
+		if (options.getBitLength() != 8 * material.length) {
+			throw new IOException("Wrong key length. Required "
+					+ options.getBitLength() + ", but got "
+					+ (8 * material.length));
+		}
+		cache.put(name, meta);
+		String versionName = buildVersionName(name, 0);
+		if (logger.isDebugEnabled()) {
+			logger.debug("<== RangerKeyStoreProvider.createKey()");
+		}
+		return innerSetKeyVersion(name, versionName, material,
+				meta.getCipher(), meta.getBitLength(), meta.getDescription(),
+				meta.getVersions(), meta.getAttributes());
+	}
+
+	KeyVersion innerSetKeyVersion(String name, String versionName,
+			byte[] material, String cipher, int bitLength, String description,
+			int version, Map<String, String> attributes) throws IOException {
+		if (logger.isDebugEnabled()) {
+			logger.debug("==> RangerKeyStoreProvider.innerSetKeyVersion()");
+			logger.debug("name : " + name + " and versionName : " + versionName);
+		}
+		try {
+			String attribute = JsonUtilsV2.mapToJson(attributes);
+			if (azureKeyVaultEnabled) {
+				dbStore.addSecureKeyByteEntry(versionName, new SecretKeySpec(
+						material, cipher), cipher, bitLength, description,
+						version, attribute);
+			} else {
+				dbStore.addKeyEntry(versionName, new SecretKeySpec(material,
+						cipher), masterKey, cipher, bitLength, description,
+						version, attribute);
+			}
+		} catch (Exception e) {
+			throw new IOException("Can't store key " + versionName, e);
+		}
+		changed = true;
+		if (logger.isDebugEnabled()) {
+			logger.debug("<== RangerKeyStoreProvider.innerSetKeyVersion()");
+		}
+		return new KeyVersion(name, versionName, material);
+	}
+
+	@Override
+	public void deleteKey(String name) throws IOException {
+		if (logger.isDebugEnabled()) {
+			logger.debug("==> RangerKeyStoreProvider.deleteKey(" + name + ")");
+		}
+		reloadKeys();
+		Metadata meta = getMetadata(name);
+		if (meta == null) {
+			throw new IOException("Key " + name + " does not exist");
+		}
+		for (int v = 0; v < meta.getVersions(); ++v) {
+			String versionName = buildVersionName(name, v);
+			try {
+				if (dbStore.engineContainsAlias(versionName)) {
+					dbStore.engineDeleteEntry(versionName);
+				}
+			} catch (KeyStoreException e) {
+				throw new IOException("Problem removing " + versionName, e);
+			}
+		}
+		try {
+			if (dbStore.engineContainsAlias(name)) {
+				dbStore.engineDeleteEntry(name);
+			}
+		} catch (KeyStoreException e) {
+			throw new IOException("Problem removing " + name + " from " + this,
+					e);
+		}
+		cache.remove(name);
+		changed = true;
+	}
+
+	@Override
+	public void flush() throws IOException {
+		try {
+			if (!changed) {
+				return;
+			}
+			// put all of the updates into the db
+			for (Map.Entry<String, Metadata> entry : cache.entrySet()) {
+				try {
+					Metadata metadata = entry.getValue();
+					String attributes = JsonUtilsV2.mapToJson(metadata
+							.getAttributes());
+					if (azureKeyVaultEnabled) {
+						Key ezkey = new KeyMetadata(metadata);
+						if (ezkey.getEncoded().length == 0) {
+							KeyGenerator keyGenerator = KeyGenerator
+									.getInstance(metadata.getAlgorithm());
+							keyGenerator.init(metadata.getBitLength());
+							byte[] key = keyGenerator.generateKey()
+									.getEncoded();
+							ezkey = new SecretKeySpec(key, metadata.getCipher());
+						}
+
+						dbStore.addSecureKeyByteEntry(entry.getKey(), ezkey,
+								metadata.getCipher(), metadata.getBitLength(),
+								metadata.getDescription(),
+								metadata.getVersions(), attributes);
+					} else {
+						dbStore.addKeyEntry(entry.getKey(), new KeyMetadata(
+								metadata), masterKey, metadata.getAlgorithm(),
+								metadata.getBitLength(), metadata
+										.getDescription(), metadata
+										.getVersions(), attributes);
+					}
+				} catch (Exception e) {
+					throw new IOException("Can't set metadata key "
+							+ entry.getKey(), e);
+				}
+			}
+			try {
+				dbStore.engineStore(null, masterKey);
+				reloadKeys();
+			} catch (NoSuchAlgorithmException e) {
+				throw new IOException("No such algorithm storing key", e);
+			} catch (CertificateException e) {
+				throw new IOException("Certificate exception storing key", e);
+			}
+			changed = false;
+		} catch (IOException ioe) {
+			cache.clear();
+			reloadKeys();
+			throw ioe;
+		}
+	}
+
+	@Override
+	public KeyVersion getKeyVersion(String versionName) throws IOException {
+		readLock.lock();
+
+		try {
+			if (azureKeyVaultEnabled) {
+				byte[] decryptKeyByte = null;
+				try {
+					if (!dbStore.engineContainsAlias(versionName)) {
+						dbStore.engineLoad(null, masterKey);
+						if (!dbStore.engineContainsAlias(versionName)) {
+							return null;
+						}
+					}
+					try {
+						decryptKeyByte = dbStore
+								.engineGetDecryptedZoneKeyByte(versionName);
+					} catch (Exception e) {
+						throw new RuntimeException(
+								"Error while getting decrypted key." + e);
+					}
+					if (decryptKeyByte == null || decryptKeyByte.length == 0) {
+						return null;
+					} else {
+						return new KeyVersion(getBaseName(versionName),
+								versionName, decryptKeyByte);
+					}
+
+				} catch (NoSuchAlgorithmException e) {
+					throw new IOException("Can't get algorithm for key "
+							+ e.getMessage());
+				} catch (CertificateException e) {
+
+					throw new IOException("Certificate exception storing key",
+							e);
+				}
+			} else {
+				SecretKeySpec key = null;
+				try {
+					if (!dbStore.engineContainsAlias(versionName)) {
+						dbStore.engineLoad(null, masterKey);
+						if (!dbStore.engineContainsAlias(versionName)) {
+							return null;
+						}
+					}
+					key = (SecretKeySpec) dbStore.engineGetKey(versionName,
+							masterKey);
+				} catch (NoSuchAlgorithmException e) {
+					throw new IOException("Can't get algorithm for key " + key,
+							e);
+				} catch (UnrecoverableKeyException e) {
+					throw new IOException("Can't recover key " + key, e);
+				} catch (CertificateException e) {
+					throw new IOException("Certificate exception storing key",
+							e);
+				}
+				if (key == null) {
+					return null;
+				} else {
+					return new KeyVersion(getBaseName(versionName),
+							versionName, key.getEncoded());
+				}
+			}
+		} finally {
+			readLock.unlock();
+		}
+	}
+
+	@Override
+	public List<KeyVersion> getKeyVersions(String name) throws IOException {
+		List<KeyVersion> list = new ArrayList<KeyVersion>();
+		Metadata km = getMetadata(name);
+		if (km != null) {
+			int latestVersion = km.getVersions();
+			KeyVersion v = null;
+			String versionName = null;
+			for (int i = 0; i < latestVersion; i++) {
+				versionName = buildVersionName(name, i);
+				v = getKeyVersion(versionName);
+				if (v != null) {
+					list.add(v);
+				}
+			}
+		}
+		return list;
+	}
+
+	@Override
+	public List<String> getKeys() throws IOException {
+		ArrayList<String> list = new ArrayList<String>();
+		String alias = null;
+		reloadKeys();
+		Enumeration<String> e = dbStore.engineAliases();
+		while (e.hasMoreElements()) {
+			alias = e.nextElement();
+			// only include the metadata key names in the list of names
+			if (!alias.contains("@")) {
+				list.add(alias);
+			}
+		}
+		return list;
+	}
+
+	@Override
+	public Metadata getMetadata(String name) throws IOException {
+		try {
+			readLock.lock();
+			if (cache.containsKey(name)) {
+				Metadata meta = cache.get(name);
+				return meta;
+			}
+			try {
+				if (!dbStore.engineContainsAlias(name)) {
+					dbStore.engineLoad(null, masterKey);
+					if (!dbStore.engineContainsAlias(name)) {
+						return null;
+					}
+				}
+				if (azureKeyVaultEnabled) {
+					Metadata meta = dbStore.engineGetKeyMetadata(name);
+					if (meta != null) {
+						cache.put(name, meta);
+						return meta;
+					}
+				} else {
+					Key key = dbStore.engineGetKey(name, masterKey);
+					if (key != null) {
+						Metadata meta = ((KeyMetadata) key).metadata;
+						cache.put(name, meta);
+						return meta;
+					}
+				}
+			} catch (NoSuchAlgorithmException e) {
+				throw new IOException("Can't get algorithm for " + name, e);
+			} catch (UnrecoverableKeyException e) {
+				throw new IOException("Can't recover key for " + name, e);
+			}
+			return null;
+		} catch (Exception e) {
+			throw new IOException("Please try again ", e);
+		} finally {
+			readLock.unlock();
+		}
+	}
+
+	@Override
+	public KeyVersion rollNewVersion(String name, byte[] material)
+			throws IOException {
+		if (logger.isDebugEnabled()) {
+			logger.debug("==> RangerKeyStoreProvider.rollNewVersion()");
+		}
+		reloadKeys();
+		Metadata meta = getMetadata(name);
+		if (meta == null) {
+			throw new IOException("Key " + name + " not found");
+		}
+		if (meta.getBitLength() != 8 * material.length) {
+			throw new IOException("Wrong key length. Required "
+					+ meta.getBitLength() + ", but got "
+					+ (8 * material.length));
+		}
+		int nextVersion = meta.addVersion();
+		String versionName = buildVersionName(name, nextVersion);
+		return innerSetKeyVersion(name, versionName, material,
+				meta.getCipher(), meta.getBitLength(), meta.getDescription(),
+				meta.getVersions(), meta.getAttributes());
+	}
+
+	private static void getFromJceks(Configuration conf, String path,
+			String alias, String key) {
+		if (logger.isDebugEnabled()) {
+			logger.debug("==> RangerKeyStoreProvider.getFromJceks()");
+		}
+		// update credential from keystore
+		if (conf != null) {
+			String pathValue = conf.get(path);
+			String aliasValue = conf.get(alias);
+			if (pathValue != null && aliasValue != null) {
+				String xaDBPassword = CredentialReader.getDecryptedString(
+						pathValue.trim(), aliasValue.trim());
+				if (xaDBPassword != null && !xaDBPassword.trim().isEmpty()
+						&& !xaDBPassword.trim().equalsIgnoreCase("none")) {
+					conf.set(key, xaDBPassword);
+				} else {
+					logger.info("Credential keystore password not applied for KMS; clear text password shall be applicable");
+				}
+			}
+		}
+	}
+
+	private void reloadKeys() throws IOException {
+		if (logger.isDebugEnabled()) {
+			logger.debug("==> RangerKeyStoreProvider.reloadKeys()");
+		}
+		try {
+			cache.clear();
+			loadKeys(masterKey);
+		} catch (NoSuchAlgorithmException e) {
+			throw new IOException("Can't load Keys");
+		} catch (CertificateException e) {
+			throw new IOException("Can't load Keys");
+		}
+	}
+
+	/**
+	 * The factory to create JksProviders, which is used by the ServiceLoader.
+	 */
+	public static class Factory extends KeyProviderFactory {
+		@Override
+		public KeyProvider createProvider(URI providerName, Configuration conf)
+				throws IOException {
+			try {
+				if (SCHEME_NAME.equals(providerName.getScheme())) {
+					return new RangerKeyStoreProvider(conf);
+				}
+			} catch (Throwable e) {
+				logger.error(
+						"==> RangerKeyStoreProvider.reloadKeys() error : ", e);
+			}
+			return null;
+		}
+	}
+
+	/**
+	 * An adapter between a KeyStore Key and our Metadata. This is used to store
+	 * the metadata in a KeyStore even though isn't really a key.
+	 */
+	public static class KeyMetadata implements Key, Serializable {
+		Metadata metadata;
+		private final static long serialVersionUID = 8405872419967874451L;
+
+		private KeyMetadata(Metadata meta) {
+			this.metadata = meta;
+		}
+
+		@Override
+		public String getAlgorithm() {
+			return metadata.getCipher();
+		}
+
+		@Override
+		public String getFormat() {
+			return KEY_METADATA;
+		}
+
+		@Override
+		public byte[] getEncoded() {
+			return new byte[0];
+		}
+
+		private void writeObject(ObjectOutputStream out) throws IOException {
+			byte[] serialized = metadata.serialize();
+			out.writeInt(serialized.length);
+			out.write(serialized);
+		}
+
+		private void readObject(ObjectInputStream in) throws IOException,
+				ClassNotFoundException {
+			byte[] buf = new byte[in.readInt()];
+			in.readFully(buf);
+			metadata = new Metadata(buf);
+		}
+
+	}
+
 }
diff --git a/kms/src/main/java/org/apache/hadoop/crypto/key/RangerKeyVaultKeyGenerator.java b/kms/src/main/java/org/apache/hadoop/crypto/key/RangerKeyVaultKeyGenerator.java
new file mode 100644
index 0000000..854d7f0
--- /dev/null
+++ b/kms/src/main/java/org/apache/hadoop/crypto/key/RangerKeyVaultKeyGenerator.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.crypto.key;
+
+import com.microsoft.azure.keyvault.KeyVaultClient;
+import com.microsoft.azure.keyvault.models.Attributes;
+import com.microsoft.azure.keyvault.models.KeyAttributes;
+import com.microsoft.azure.keyvault.models.KeyOperationResult;
+import com.microsoft.azure.keyvault.models.custom.KeyBundle;
+import com.microsoft.azure.keyvault.requests.CreateKeyRequest;
+import com.microsoft.azure.keyvault.webkey.JsonWebKeyEncryptionAlgorithm;
+import com.microsoft.azure.keyvault.webkey.JsonWebKeyType;
+
+import java.security.Key;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
+
+public class RangerKeyVaultKeyGenerator implements RangerKMSMKI {
+
+	static final Logger logger = Logger
+			.getLogger(RangerKeyVaultKeyGenerator.class);
+	private static final String AZURE_KEYVAULT_URL = "ranger.kms.azurekeyvault.url";
+	private static final String AZURE_MASTER_KEY_ALIAS = "ranger.kms.azure.masterkey.name";
+	private static final String AZURE_MASTER_KEY_TYPE = "ranger.kms.azure.masterkey.type";
+	private static final String ZONE_KEY_ENCRYPTION_ALGO = "ranger.kms.azure.zonekey.encryption.algorithm";
+	private String keyVaultURL;
+	private String azureMasterKey;
+	private String azureMasterKeyType;
+	private String zoneKeyEncryptionAlgo;
+	private KeyVaultClient keyVaultClient;
+	private KeyBundle masterKeyBundle;
+
+	public RangerKeyVaultKeyGenerator(Configuration conf,
+			KeyVaultClient kvClient) {
+		this.keyVaultURL = conf.get(AZURE_KEYVAULT_URL);
+		this.azureMasterKey = conf.get(AZURE_MASTER_KEY_ALIAS);
+		this.azureMasterKeyType = conf.get(AZURE_MASTER_KEY_TYPE);
+		this.zoneKeyEncryptionAlgo = conf.get(ZONE_KEY_ENCRYPTION_ALGO);
+		this.keyVaultClient = kvClient;
+	}
+
+	@Override
+	public boolean generateMasterKey(String password) throws Exception {
+		if (keyVaultClient == null) {
+			throw new Exception(
+					"Key Vault Client is null. Please check the azure related configuration.");
+		}
+		try {
+			masterKeyBundle = keyVaultClient
+					.getKey(keyVaultURL, azureMasterKey);
+
+		} catch (Exception ex) {
+			throw new Exception(
+					"Error while getting existing master key from Azure.  Master Key Name : "
+							+ azureMasterKey + " . Key Vault URL : "
+							+ keyVaultURL + " . Error : " + ex.getMessage());
+		}
+		if (masterKeyBundle == null) {
+			try {
+				JsonWebKeyType keyType;
+				switch (azureMasterKeyType) {
+				case "RSA":
+					keyType = JsonWebKeyType.RSA;
+					break;
+
+				case "RSA_HSM":
+					keyType = JsonWebKeyType.RSA_HSM;
+					break;
+
+				case "EC":
+					keyType = JsonWebKeyType.EC;
+					break;
+
+				case "EC_HSM":
+					keyType = JsonWebKeyType.EC_HSM;
+					break;
+
+				case "OCT":
+					keyType = JsonWebKeyType.OCT;
+					break;
+
+				default:
+					keyType = JsonWebKeyType.RSA;
+				}
+
+				Attributes masterKeyattribute = new KeyAttributes()
+						.withEnabled(true).withNotBefore(new DateTime());
+
+				CreateKeyRequest createKeyRequest = new CreateKeyRequest.Builder(
+						keyVaultURL, azureMasterKey, keyType).withAttributes(
+						masterKeyattribute).build();
+				masterKeyBundle = keyVaultClient.createKeyAsync(
+						createKeyRequest, null).get();
+				return true;
+			} catch (Exception ex) {
+				throw new Exception("Error while creating master key  : "
+						+ ex.getMessage());
+			}
+		} else {
+			logger.info("Azure Master key exist with name :" + azureMasterKey
+					+ " with key identifier " + masterKeyBundle.key().kid());
+			return true;
+		}
+	}
+
+	public byte[] encryptZoneKey(Key zoneKey) throws Exception {
+		JsonWebKeyEncryptionAlgorithm keyEncryptionAlgo = getZoneKeyEncryptionAlgo();
+		KeyOperationResult encryptResult = null;
+		
+		if (masterKeyBundle == null) {
+			masterKeyBundle = keyVaultClient
+					.getKey(keyVaultURL, azureMasterKey);
+		}
+		try {
+			encryptResult = keyVaultClient.encryptAsync(
+					masterKeyBundle.key().kid(), keyEncryptionAlgo,
+					zoneKey.getEncoded(), null).get();
+
+		} catch (Exception e) {
+			throw new Exception("Error while encrypting zone key." + e);
+		}
+		return encryptResult.result();
+	}
+
+	public byte[] dencryptZoneKey(byte[] encryptedByte) throws Exception {
+		JsonWebKeyEncryptionAlgorithm keyEncryptionAlgo = getZoneKeyEncryptionAlgo();
+		if (masterKeyBundle == null) {
+			masterKeyBundle = keyVaultClient
+					.getKey(keyVaultURL, azureMasterKey);
+		}
+		KeyOperationResult decryptResult = null;
+		try {
+			decryptResult = keyVaultClient.decryptAsync(
+					masterKeyBundle.key().kid(), keyEncryptionAlgo,
+					encryptedByte, null).get();
+
+		} catch (Exception e) {
+			throw new Exception("Error while decrypting zone key." + e);
+		}
+		return decryptResult.result();
+	}
+
+	private JsonWebKeyEncryptionAlgorithm getZoneKeyEncryptionAlgo() {
+		JsonWebKeyEncryptionAlgorithm keyEncryptionAlgo;
+		switch (zoneKeyEncryptionAlgo) {
+		case "RSA_OAEP":
+			keyEncryptionAlgo = JsonWebKeyEncryptionAlgorithm.RSA_OAEP;
+			break;
+
+		case "RSA_OAEP_256":
+			keyEncryptionAlgo = JsonWebKeyEncryptionAlgorithm.RSA_OAEP_256;
+			break;
+
+		case "RSA1_5":
+			keyEncryptionAlgo = JsonWebKeyEncryptionAlgorithm.RSA1_5;
+			break;
+
+		default:
+			keyEncryptionAlgo = JsonWebKeyEncryptionAlgorithm.RSA_OAEP;
+		}
+		return keyEncryptionAlgo;
+	}
+
+	@Override
+	public String getMasterKey(String masterKeySecretName) {
+		/*
+		 * This method is not require for Azure Key Vault because we can't get
+		 * key outside of key vault
+		 */
+		return null;
+	}
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 9356780..bb2e847 100644
--- a/pom.xml
+++ b/pom.xml
@@ -196,6 +196,19 @@
         <presto.guice.version>4.2.2</presto.guice.version>
         <presto.guava.version>26.0-jre</presto.guava.version>
         <presto.validation-api.version>2.0.1.Final</presto.validation-api.version>
+
+	<!-- Azure Key Vault dependencies -->
+        <com.microsoft.azure.version>1.22.0</com.microsoft.azure.version>
+        <com.microsoft.azure.azure-keyvault.version>1.2.1</com.microsoft.azure.azure-keyvault.version>
+        <com.microsoft.azure.azure-mgmt-keyvault.version>1.22.0</com.microsoft.azure.azure-mgmt-keyvault.version>
+        <com.microsoft.rest.client-runtime.version>1.6.7</com.microsoft.rest.client-runtime.version>
+        <com.microsoft.azure.azure-client-runtime.version>1.6.7</com.microsoft.azure.azure-client-runtime.version>
+        <com.microsoft.azure.adal4j.version>1.6.4</com.microsoft.azure.adal4j.version>
+        <io.reactivex.rxjava.version>1.3.8</io.reactivex.rxjava.version>
+        <net.minidev.asm.version>1.0.2</net.minidev.asm.version>
+        <org.bouncycastle.bcprov-jdk15on>1.59</org.bouncycastle.bcprov-jdk15on>
+        <org.bouncycastle.bcpkix-jdk15on>1.59</org.bouncycastle.bcpkix-jdk15on>
+
     </properties>
     <profiles>
         <profile>
diff --git a/src/main/assembly/kms.xml b/src/main/assembly/kms.xml
index faa5a04..c1d5c1f 100755
--- a/src/main/assembly/kms.xml
+++ b/src/main/assembly/kms.xml
@@ -110,6 +110,57 @@
                             <include>com.fasterxml.jackson.core:jackson-core</include>
                             <include>com.fasterxml.jackson.core:jackson-annotations</include>
                             <include>com.fasterxml.jackson.core:jackson-databind</include>
+                            <include>com.microsoft.azure:azure:jar:${com.microsoft.azure.version}</include>
+                            <include>com.microsoft.azure:azure-keyvault:jar:${com.microsoft.azure.azure-keyvault.version}</include>
+                            <include>com.microsoft.azure:azure-mgmt-keyvault:jar:${com.microsoft.azure.azure-mgmt-keyvault.version}</include>
+                            <include>com.microsoft.rest:client-runtime:jar:${com.microsoft.rest.client-runtime.version}</include>
+                            <include>com.microsoft.azure:azure-client-runtime:jar:${com.microsoft.azure.azure-client-runtime.version}</include>
+                            <include>com.microsoft.azure:adal4j:jar:${com.microsoft.azure.adal4j.version}</include>
+                            <include>io.reactivex:rxjava:jar:${io.reactivex.rxjava.version}</include>
+                            <include>com.squareup.okhttp3:okhttp</include>
+                            <include>com.squareup.okio:okio</include>
+                            <include>com.squareup.retrofit2:retrofit</include>
+                            <include>com.squareup.retrofit2:adapter-rxjava</include>
+                            <include>com.squareup.okhttp3:okhttp-urlconnection</include>
+                            <include>com.fasterxml.jackson.datatype:jackson-datatype-joda</include>
+                            <include>joda-time:okhttp-urlconnection</include>
+                            <include>joda-time:joda-time</include>
+                            <include>com.nimbusds:oauth2-oidc-sdk</include>
+                            <include>net.minidev:json-smart</include>
+                            <include>net.minidev:asm:jar:${net.minidev.asm.version}</include>
+                            <include>javax.mail:javax.mail-api</include>
+                            <include>com.sun.mail:javax.mail</include>
+                            <include>com.nimbusds:nimbus-jose-jwt</include>
+                            <include>org.apache.commons:commons-lang3</include>
+                            <include>com.microsoft.azure:azure-keyvault-webkey</include>
+                            <include>org.bouncycastle:bcprov-jdk15on</include>
+                            <include>org.bouncycastle:bcpkix-jdk15on</include>
+                            <include>com.microsoft.azure:azure-mgmt-resources</include>
+                            <include>com.microsoft.azure:azure-client-authentication</include>
+                            <include>com.microsoft.azure:azure-mgmt-graph-rbac</include>
+                            <include>com.microsoft.azure:azure-mgmt-storage</include>
+                            <include>com.microsoft.azure:azure-mgmt-compute</include>
+                            <include>com.microsoft.azure:azure-mgmt-network</include>
+                            <include>com.microsoft.azure:azure-mgmt-batch</include>
+                            <include>com.microsoft.azure:azure-mgmt-batchai</include>
+                            <include>com.microsoft.azure:azure-mgmt-trafficmanager</include>
+                            <include>com.microsoft.azure:azure-mgmt-redis</include>
+                            <include>com.microsoft.azure:azure-mgmt-cdn</include>
+                            <include>com.microsoft.azure:azure-mgmt-dns</include>
+                            <include>com.microsoft.azure:azure-mgmt-appservice</include>
+                            <include>com.microsoft.azure:azure-mgmt-sql</include>
+                            <include>com.microsoft.azure:azure-mgmt-servicebus</include>
+                            <include>com.microsoft.azure:azure-mgmt-containerinstance</include>
+                            <include>com.microsoft.azure:azure-mgmt-containerregistry</include>
+                            <include>com.microsoft.azure:azure-mgmt-containerservice</include>
+                            <include>com.microsoft.azure:azure-mgmt-cosmosdb</include>
+                            <include>com.microsoft.azure:azure-mgmt-search</include>
+                            <include>com.microsoft.azure:azure-mgmt-locks</include>
+                            <include>com.microsoft.azure:azure-mgmt-msi</include>
+                            <include>com.microsoft.azure:azure-mgmt-monitor</include>
+                            <include>com.microsoft.azure:azure-mgmt-eventhub</include>
+                            <include>com.microsoft.azure:azure-mgmt-eventhub</include>
+                            <include>com.microsoft.azure:azure-keyvault-cryptography</include>
                         </includes>
                     </dependencySet>
                 </dependencySets>
@@ -118,7 +169,7 @@
                 <include>org.apache.ranger:ranger-kms</include>
             </includes>
         </moduleSet>
-
+	
         <moduleSet>
             <binaries>
                 <includeDependencies>false</includeDependencies>
@@ -337,7 +388,8 @@
                 <include>VerifyIsDBMasterkeyCorrect.sh</include>
                 <include>VerifyIsHSMMasterkeyCorrect.sh</include>
                 <include>DBMKTOKEYSECURE.sh</include>
-                                <include>KEYSECUREMKTOKMSDB.sh</include>
+                <include>DBMKTOAZUREKEYVAULT.sh</include>
+                <include>KEYSECUREMKTOKMSDB.sh</include>
             </includes>
             <fileMode>544</fileMode>
         </fileSet>