You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jh...@apache.org on 2017/08/23 02:57:33 UTC

[22/36] hadoop git commit: HADOOP-14705. Add batched interface reencryptEncryptedKeys to KMS.

HADOOP-14705. Add batched interface reencryptEncryptedKeys to KMS.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4ec5acc7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4ec5acc7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4ec5acc7

Branch: refs/heads/YARN-5734
Commit: 4ec5acc70418a3f2327cf83ecae1789a057fdd99
Parents: 27ab5f7
Author: Xiao Chen <xi...@apache.org>
Authored: Tue Aug 22 07:46:46 2017 -0700
Committer: Xiao Chen <xi...@apache.org>
Committed: Tue Aug 22 07:47:39 2017 -0700

----------------------------------------------------------------------
 .../crypto/key/KeyProviderCryptoExtension.java  | 147 ++++++++++++++++---
 .../crypto/key/kms/KMSClientProvider.java       | 145 +++++++-----------
 .../hadoop/crypto/key/kms/KMSRESTConstants.java |   1 +
 .../key/kms/LoadBalancingKMSClientProvider.java |  20 +++
 .../java/org/apache/hadoop/util/KMSUtil.java    | 134 +++++++++++++++++
 .../key/TestKeyProviderCryptoExtension.java     | 113 ++++++++++----
 ...rKeyGeneratorKeyProviderCryptoExtension.java |   6 +
 .../hadoop/crypto/key/kms/server/KMS.java       | 113 +++++++++++---
 .../crypto/key/kms/server/KMSJSONReader.java    |   8 +-
 .../key/kms/server/KMSServerJSONUtils.java      |  34 +----
 .../hadoop/crypto/key/kms/server/KMSWebApp.java |  18 +++
 .../kms/server/KeyAuthorizationKeyProvider.java |  19 +++
 .../hadoop-kms/src/site/markdown/index.md.vm    |  60 +++++++-
 .../hadoop/crypto/key/kms/server/TestKMS.java   |  46 +++++-
 .../crypto/key/kms/server/TestKMSAudit.java     |   7 +-
 15 files changed, 673 insertions(+), 198 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ec5acc7/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderCryptoExtension.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderCryptoExtension.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderCryptoExtension.java
index ea5ff28..8c879b3 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderCryptoExtension.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderCryptoExtension.java
@@ -22,6 +22,8 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.security.GeneralSecurityException;
 import java.security.SecureRandom;
+import java.util.List;
+import java.util.ListIterator;
 
 import javax.crypto.Cipher;
 import javax.crypto.spec.IvParameterSpec;
@@ -247,6 +249,25 @@ public class KeyProviderCryptoExtension extends
      */
     EncryptedKeyVersion reencryptEncryptedKey(EncryptedKeyVersion ekv)
         throws IOException, GeneralSecurityException;
+
+    /**
+     * Batched version of {@link #reencryptEncryptedKey(EncryptedKeyVersion)}.
+     * <p>
+     * For each encrypted key version, re-encrypts an encrypted key version,
+     * using its initialization vector and key material, but with the latest
+     * key version name of its key name. If the latest key version name in the
+     * provider is the same as the one encrypted the passed-in encrypted key
+     * version, the same encrypted key version is returned.
+     * <p>
+     * NOTE: The generated key is not stored by the <code>KeyProvider</code>
+     *
+     * @param  ekvs List containing the EncryptedKeyVersion's
+     * @throws IOException If any EncryptedKeyVersion could not be re-encrypted
+     * @throws GeneralSecurityException If any EncryptedKeyVersion could not be
+     *                            re-encrypted because of a cryptographic issue.
+     */
+    void reencryptEncryptedKeys(List<EncryptedKeyVersion> ekvs)
+        throws IOException, GeneralSecurityException;
   }
 
   private static class DefaultCryptoExtension implements CryptoExtension {
@@ -315,7 +336,7 @@ public class KeyProviderCryptoExtension extends
           .checkNotNull(ekNow, "KeyVersion name '%s' does not exist", ekName);
       Preconditions.checkArgument(ekv.getEncryptedKeyVersion().getVersionName()
               .equals(KeyProviderCryptoExtension.EEK),
-          "encryptedKey version name must be '%s', is '%s'",
+          "encryptedKey version name must be '%s', but found '%s'",
           KeyProviderCryptoExtension.EEK,
           ekv.getEncryptedKeyVersion().getVersionName());
 
@@ -336,30 +357,67 @@ public class KeyProviderCryptoExtension extends
     }
 
     @Override
-    public KeyVersion decryptEncryptedKey(
-        EncryptedKeyVersion encryptedKeyVersion) throws IOException,
-        GeneralSecurityException {
-      // Fetch the encryption key material
-      final String encryptionKeyVersionName =
-          encryptedKeyVersion.getEncryptionKeyVersionName();
-      final KeyVersion encryptionKey =
-          keyProvider.getKeyVersion(encryptionKeyVersionName);
-      Preconditions.checkNotNull(encryptionKey,
-          "KeyVersion name '%s' does not exist", encryptionKeyVersionName);
-      Preconditions.checkArgument(
-              encryptedKeyVersion.getEncryptedKeyVersion().getVersionName()
-                    .equals(KeyProviderCryptoExtension.EEK),
-                "encryptedKey version name must be '%s', is '%s'",
-                KeyProviderCryptoExtension.EEK,
-                encryptedKeyVersion.getEncryptedKeyVersion().getVersionName()
-            );
+    public void reencryptEncryptedKeys(List<EncryptedKeyVersion> ekvs)
+        throws IOException, GeneralSecurityException {
+      Preconditions.checkNotNull(ekvs, "Input list is null");
+      KeyVersion ekNow = null;
+      Decryptor decryptor = null;
+      Encryptor encryptor = null;
+      try (CryptoCodec cc = CryptoCodec.getInstance(keyProvider.getConf())) {
+        decryptor = cc.createDecryptor();
+        encryptor = cc.createEncryptor();
+        ListIterator<EncryptedKeyVersion> iter = ekvs.listIterator();
+        while (iter.hasNext()) {
+          final EncryptedKeyVersion ekv = iter.next();
+          Preconditions.checkNotNull(ekv, "EncryptedKeyVersion is null");
+          final String ekName = ekv.getEncryptionKeyName();
+          Preconditions.checkNotNull(ekName, "Key name is null");
+          Preconditions.checkNotNull(ekv.getEncryptedKeyVersion(),
+              "EncryptedKeyVersion is null");
+          Preconditions.checkArgument(
+              ekv.getEncryptedKeyVersion().getVersionName()
+                  .equals(KeyProviderCryptoExtension.EEK),
+              "encryptedKey version name must be '%s', but found '%s'",
+              KeyProviderCryptoExtension.EEK,
+              ekv.getEncryptedKeyVersion().getVersionName());
+
+          if (ekNow == null) {
+            ekNow = keyProvider.getCurrentKey(ekName);
+            Preconditions
+                .checkNotNull(ekNow, "Key name '%s' does not exist", ekName);
+          } else {
+            Preconditions.checkArgument(ekNow.getName().equals(ekName),
+                "All keys must have the same key name. Expected '%s' "
+                    + "but found '%s'", ekNow.getName(), ekName);
+          }
+
+          final String encryptionKeyVersionName =
+              ekv.getEncryptionKeyVersionName();
+          final KeyVersion encryptionKey =
+              keyProvider.getKeyVersion(encryptionKeyVersionName);
+          Preconditions.checkNotNull(encryptionKey,
+              "KeyVersion name '%s' does not exist", encryptionKeyVersionName);
+          if (encryptionKey.equals(ekNow)) {
+            // no-op if same key version
+            continue;
+          }
+
+          final KeyVersion ek =
+              decryptEncryptedKey(decryptor, encryptionKey, ekv);
+          iter.set(generateEncryptedKey(encryptor, ekNow, ek.getMaterial(),
+              ekv.getEncryptedKeyIv()));
+        }
+      }
+    }
 
+    private KeyVersion decryptEncryptedKey(final Decryptor decryptor,
+        final KeyVersion encryptionKey,
+        final EncryptedKeyVersion encryptedKeyVersion)
+        throws IOException, GeneralSecurityException {
       // Encryption key IV is determined from encrypted key's IV
       final byte[] encryptionIV =
           EncryptedKeyVersion.deriveIV(encryptedKeyVersion.getEncryptedKeyIv());
 
-      CryptoCodec cc = CryptoCodec.getInstance(keyProvider.getConf());
-      Decryptor decryptor = cc.createDecryptor();
       decryptor.init(encryptionKey.getMaterial(), encryptionIV);
       final KeyVersion encryptedKV =
           encryptedKeyVersion.getEncryptedKeyVersion();
@@ -372,11 +430,36 @@ public class KeyProviderCryptoExtension extends
       bbOut.flip();
       byte[] decryptedKey = new byte[keyLen];
       bbOut.get(decryptedKey);
-      cc.close();
       return new KeyVersion(encryptionKey.getName(), EK, decryptedKey);
     }
 
     @Override
+    public KeyVersion decryptEncryptedKey(
+        EncryptedKeyVersion encryptedKeyVersion)
+        throws IOException, GeneralSecurityException {
+      // Fetch the encryption key material
+      final String encryptionKeyVersionName =
+          encryptedKeyVersion.getEncryptionKeyVersionName();
+      final KeyVersion encryptionKey =
+          keyProvider.getKeyVersion(encryptionKeyVersionName);
+      Preconditions
+          .checkNotNull(encryptionKey, "KeyVersion name '%s' does not exist",
+              encryptionKeyVersionName);
+      Preconditions.checkArgument(
+          encryptedKeyVersion.getEncryptedKeyVersion().getVersionName()
+              .equals(KeyProviderCryptoExtension.EEK),
+          "encryptedKey version name must be '%s', but found '%s'",
+          KeyProviderCryptoExtension.EEK,
+          encryptedKeyVersion.getEncryptedKeyVersion().getVersionName());
+
+      try (CryptoCodec cc = CryptoCodec.getInstance(keyProvider.getConf())) {
+        final Decryptor decryptor = cc.createDecryptor();
+        return decryptEncryptedKey(decryptor, encryptionKey,
+            encryptedKeyVersion);
+      }
+    }
+
+    @Override
     public void warmUpEncryptedKeys(String... keyNames)
         throws IOException {
       // NO-OP since the default version does not cache any keys
@@ -471,6 +554,28 @@ public class KeyProviderCryptoExtension extends
   }
 
   /**
+   * Batched version of {@link #reencryptEncryptedKey(EncryptedKeyVersion)}.
+   * <p>
+   * For each encrypted key version, re-encrypts an encrypted key version,
+   * using its initialization vector and key material, but with the latest
+   * key version name of its key name. If the latest key version name in the
+   * provider is the same as the one encrypted the passed-in encrypted key
+   * version, the same encrypted key version is returned.
+   * <p>
+   * NOTE: The generated key is not stored by the <code>KeyProvider</code>
+   *
+   * @param  ekvs List containing the EncryptedKeyVersion's
+   * @return      The re-encrypted EncryptedKeyVersion's, in the same order.
+   * @throws IOException If any EncryptedKeyVersion could not be re-encrypted
+   * @throws GeneralSecurityException If any EncryptedKeyVersion could not be
+   *                            re-encrypted because of a cryptographic issue.
+   */
+  public void reencryptEncryptedKeys(List<EncryptedKeyVersion> ekvs)
+      throws IOException, GeneralSecurityException {
+    getExtension().reencryptEncryptedKeys(ekvs);
+  }
+
+  /**
    * Creates a <code>KeyProviderCryptoExtension</code> using a given
    * {@link KeyProvider}.
    * <p/>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ec5acc7/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java
index 20ad58c..af0dd82 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java
@@ -70,7 +70,6 @@ import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
@@ -84,6 +83,13 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 
+import static org.apache.hadoop.util.KMSUtil.checkNotEmpty;
+import static org.apache.hadoop.util.KMSUtil.checkNotNull;
+import static org.apache.hadoop.util.KMSUtil.parseJSONEncKeyVersion;
+import static org.apache.hadoop.util.KMSUtil.parseJSONEncKeyVersions;
+import static org.apache.hadoop.util.KMSUtil.parseJSONKeyVersion;
+import static org.apache.hadoop.util.KMSUtil.parseJSONMetadata;
+
 /**
  * KMS client <code>KeyProvider</code> implementation.
  */
@@ -219,77 +225,11 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
     }
   }
 
-  @SuppressWarnings("rawtypes")
-  private static List<EncryptedKeyVersion>
-      parseJSONEncKeyVersions(String keyName, List valueList) {
-    List<EncryptedKeyVersion> ekvs = new LinkedList<EncryptedKeyVersion>();
-    if (!valueList.isEmpty()) {
-      for (Object values : valueList) {
-        Map valueMap = (Map) values;
-        ekvs.add(parseJSONEncKeyVersion(keyName, valueMap));
-      }
-    }
-    return ekvs;
-  }
-
-  private static EncryptedKeyVersion parseJSONEncKeyVersion(String keyName,
-      Map valueMap) {
-    String versionName = checkNotNull(
-        (String) valueMap.get(KMSRESTConstants.VERSION_NAME_FIELD),
-        KMSRESTConstants.VERSION_NAME_FIELD);
-
-    byte[] iv = Base64.decodeBase64(checkNotNull(
-        (String) valueMap.get(KMSRESTConstants.IV_FIELD),
-        KMSRESTConstants.IV_FIELD));
-
-    Map encValueMap = checkNotNull((Map)
-            valueMap.get(KMSRESTConstants.ENCRYPTED_KEY_VERSION_FIELD),
-        KMSRESTConstants.ENCRYPTED_KEY_VERSION_FIELD);
-
-    String encVersionName = checkNotNull((String)
-            encValueMap.get(KMSRESTConstants.VERSION_NAME_FIELD),
-        KMSRESTConstants.VERSION_NAME_FIELD);
-
-    byte[] encKeyMaterial = Base64.decodeBase64(checkNotNull((String)
-            encValueMap.get(KMSRESTConstants.MATERIAL_FIELD),
-        KMSRESTConstants.MATERIAL_FIELD));
-
-    return new KMSEncryptedKeyVersion(keyName, versionName, iv,
-        encVersionName, encKeyMaterial);
-  }
-
-  private static KeyVersion parseJSONKeyVersion(Map valueMap) {
-    KeyVersion keyVersion = null;
-    if (!valueMap.isEmpty()) {
-      byte[] material = (valueMap.containsKey(KMSRESTConstants.MATERIAL_FIELD))
-          ? Base64.decodeBase64((String) valueMap.get(KMSRESTConstants.MATERIAL_FIELD))
-          : null;
-      String versionName = (String)valueMap.get(KMSRESTConstants.VERSION_NAME_FIELD);
-      String keyName = (String)valueMap.get(KMSRESTConstants.NAME_FIELD);
-      keyVersion = new KMSKeyVersion(keyName, versionName, material);
-    }
-    return keyVersion;
-  }
-
-  @SuppressWarnings("unchecked")
-  private static Metadata parseJSONMetadata(Map valueMap) {
-    Metadata metadata = null;
-    if (!valueMap.isEmpty()) {
-      metadata = new KMSMetadata(
-          (String) valueMap.get(KMSRESTConstants.CIPHER_FIELD),
-          (Integer) valueMap.get(KMSRESTConstants.LENGTH_FIELD),
-          (String) valueMap.get(KMSRESTConstants.DESCRIPTION_FIELD),
-          (Map<String, String>) valueMap.get(KMSRESTConstants.ATTRIBUTES_FIELD),
-          new Date((Long) valueMap.get(KMSRESTConstants.CREATED_FIELD)),
-          (Integer) valueMap.get(KMSRESTConstants.VERSIONS_FIELD));
-    }
-    return metadata;
-  }
-
-  private static void writeJson(Map map, OutputStream os) throws IOException {
+  private static void writeJson(Object obj, OutputStream os)
+      throws IOException {
     Writer writer = new OutputStreamWriter(os, StandardCharsets.UTF_8);
     ObjectMapper jsonMapper = new ObjectMapper();
-    jsonMapper.writerWithDefaultPrettyPrinter().writeValue(writer, map);
+    jsonMapper.writerWithDefaultPrettyPrinter().writeValue(writer, obj);
   }
 
   /**
@@ -360,25 +300,6 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
     }
   }
 
-  public static <T> T checkNotNull(T o, String name)
-      throws IllegalArgumentException {
-    if (o == null) {
-      throw new IllegalArgumentException("Parameter '" + name +
-          "' cannot be null");
-    }
-    return o;
-  }
-
-  public static String checkNotEmpty(String s, String name)
-      throws IllegalArgumentException {
-    checkNotNull(s, name);
-    if (s.isEmpty()) {
-      throw new IllegalArgumentException("Parameter '" + name +
-          "' cannot be empty");
-    }
-    return s;
-  }
-
   private String kmsUrl;
   private SSLFactory sslFactory;
   private ConnectionConfigurator configurator;
@@ -560,12 +481,12 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
     return conn;
   }
 
-  private <T> T call(HttpURLConnection conn, Map jsonOutput,
+  private <T> T call(HttpURLConnection conn, Object jsonOutput,
       int expectedResponse, Class<T> klass) throws IOException {
     return call(conn, jsonOutput, expectedResponse, klass, authRetry);
   }
 
-  private <T> T call(HttpURLConnection conn, Map jsonOutput,
+  private <T> T call(HttpURLConnection conn, Object jsonOutput,
       int expectedResponse, Class<T> klass, int authRetryCount)
       throws IOException {
     T ret = null;
@@ -885,6 +806,48 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
   }
 
   @Override
+  public void reencryptEncryptedKeys(List<EncryptedKeyVersion> ekvs)
+      throws IOException, GeneralSecurityException {
+    checkNotNull(ekvs, "ekvs");
+    if (ekvs.isEmpty()) {
+      return;
+    }
+    final List<Map> jsonPayload = new ArrayList<>();
+    String keyName = null;
+    for (EncryptedKeyVersion ekv : ekvs) {
+      checkNotNull(ekv.getEncryptionKeyName(), "keyName");
+      checkNotNull(ekv.getEncryptionKeyVersionName(), "versionName");
+      checkNotNull(ekv.getEncryptedKeyIv(), "iv");
+      checkNotNull(ekv.getEncryptedKeyVersion(), "encryptedKey");
+      Preconditions.checkArgument(ekv.getEncryptedKeyVersion().getVersionName()
+              .equals(KeyProviderCryptoExtension.EEK),
+          "encryptedKey version name must be '%s', is '%s'",
+          KeyProviderCryptoExtension.EEK,
+          ekv.getEncryptedKeyVersion().getVersionName());
+      if (keyName == null) {
+        keyName = ekv.getEncryptionKeyName();
+      } else {
+        Preconditions.checkArgument(keyName.equals(ekv.getEncryptionKeyName()),
+            "All EncryptedKey must have the same key name.");
+      }
+      jsonPayload.add(KMSUtil.toJSON(ekv));
+    }
+    final URL url = createURL(KMSRESTConstants.KEY_RESOURCE, keyName,
+        KMSRESTConstants.REENCRYPT_BATCH_SUB_RESOURCE, null);
+    final HttpURLConnection conn = createConnection(url, HTTP_POST);
+    conn.setRequestProperty(CONTENT_TYPE, APPLICATION_JSON_MIME);
+    final List<Map> response =
+        call(conn, jsonPayload, HttpURLConnection.HTTP_OK, List.class);
+    Preconditions.checkArgument(response.size() == ekvs.size(),
+        "Response size is different than input size.");
+    for (int i = 0; i < response.size(); ++i) {
+      final Map item = response.get(i);
+      final EncryptedKeyVersion ekv = parseJSONEncKeyVersion(keyName, item);
+      ekvs.set(i, ekv);
+    }
+  }
+
+  @Override
   public List<KeyVersion> getKeyVersions(String name) throws IOException {
     checkNotEmpty(name, "name");
     URL url = createURL(KMSRESTConstants.KEY_RESOURCE, name,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ec5acc7/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSRESTConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSRESTConstants.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSRESTConstants.java
index 81bdd33..e0197e0 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSRESTConstants.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSRESTConstants.java
@@ -37,6 +37,7 @@ public class KMSRESTConstants {
   public static final String EEK_SUB_RESOURCE = "_eek";
   public static final String CURRENT_VERSION_SUB_RESOURCE = "_currentversion";
   public static final String INVALIDATECACHE_RESOURCE = "_invalidatecache";
+  public static final String REENCRYPT_BATCH_SUB_RESOURCE = "_reencryptbatch";
 
   public static final String KEY = "key";
   public static final String EEK_OP = "eek_op";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ec5acc7/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/LoadBalancingKMSClientProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/LoadBalancingKMSClientProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/LoadBalancingKMSClientProvider.java
index 6e010b1..de4d25a 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/LoadBalancingKMSClientProvider.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/LoadBalancingKMSClientProvider.java
@@ -313,6 +313,26 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
   }
 
   @Override
+  public void reencryptEncryptedKeys(final List<EncryptedKeyVersion> ekvs)
+      throws IOException, GeneralSecurityException {
+    try {
+      doOp(new ProviderCallable<Void>() {
+        @Override
+        public Void call(KMSClientProvider provider)
+            throws IOException, GeneralSecurityException {
+          provider.reencryptEncryptedKeys(ekvs);
+          return null;
+        }
+      }, nextIdx());
+    } catch (WrapperException we) {
+      if (we.getCause() instanceof GeneralSecurityException) {
+        throw (GeneralSecurityException) we.getCause();
+      }
+      throw new IOException(we.getCause());
+    }
+  }
+
+  @Override
   public KeyVersion getKeyVersion(final String versionName) throws IOException {
     return doOp(new ProviderCallable<KeyVersion>() {
       @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ec5acc7/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtil.java
index 5f783a9..c96c6fb 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtil.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtil.java
@@ -17,15 +17,24 @@
  */
 package org.apache.hadoop.util;
 
+import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
 import org.apache.hadoop.crypto.key.KeyProviderFactory;
+import org.apache.hadoop.crypto.key.kms.KMSClientProvider;
+import org.apache.hadoop.crypto.key.kms.KMSRESTConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 /**
  * Utils for KMS.
@@ -71,4 +80,129 @@ public final class KMSUtil {
     }
     return keyProvider;
   }
+
+  @SuppressWarnings("unchecked")
+  public static Map toJSON(KeyProvider.KeyVersion keyVersion) {
+    Map json = new HashMap();
+    if (keyVersion != null) {
+      json.put(KMSRESTConstants.NAME_FIELD,
+          keyVersion.getName());
+      json.put(KMSRESTConstants.VERSION_NAME_FIELD,
+          keyVersion.getVersionName());
+      json.put(KMSRESTConstants.MATERIAL_FIELD,
+          Base64.encodeBase64URLSafeString(
+              keyVersion.getMaterial()));
+    }
+    return json;
+  }
+
+  @SuppressWarnings("unchecked")
+  public static Map toJSON(EncryptedKeyVersion encryptedKeyVersion) {
+    Map json = new HashMap();
+    if (encryptedKeyVersion != null) {
+      json.put(KMSRESTConstants.VERSION_NAME_FIELD,
+          encryptedKeyVersion.getEncryptionKeyVersionName());
+      json.put(KMSRESTConstants.IV_FIELD, Base64
+          .encodeBase64URLSafeString(encryptedKeyVersion.getEncryptedKeyIv()));
+      json.put(KMSRESTConstants.ENCRYPTED_KEY_VERSION_FIELD,
+          toJSON(encryptedKeyVersion.getEncryptedKeyVersion()));
+    }
+    return json;
+  }
+
+  public static <T> T checkNotNull(T o, String name)
+      throws IllegalArgumentException {
+    if (o == null) {
+      throw new IllegalArgumentException("Parameter '" + name +
+          "' cannot be null");
+    }
+    return o;
+  }
+
+  public static String checkNotEmpty(String s, String name)
+      throws IllegalArgumentException {
+    checkNotNull(s, name);
+    if (s.isEmpty()) {
+      throw new IllegalArgumentException("Parameter '" + name +
+          "' cannot be empty");
+    }
+    return s;
+  }
+
+  @SuppressWarnings("rawtypes")
+  public static List<EncryptedKeyVersion>
+      parseJSONEncKeyVersions(String keyName, List valueList) {
+    checkNotNull(valueList, "valueList");
+    List<EncryptedKeyVersion> ekvs = new ArrayList<>(valueList.size());
+    if (!valueList.isEmpty()) {
+      for (Object values : valueList) {
+        Map valueMap = (Map) values;
+        ekvs.add(parseJSONEncKeyVersion(keyName, valueMap));
+      }
+    }
+    return ekvs;
+  }
+
+  @SuppressWarnings("unchecked")
+  public static EncryptedKeyVersion parseJSONEncKeyVersion(String keyName,
+      Map valueMap) {
+    checkNotNull(valueMap, "valueMap");
+    String versionName = checkNotNull(
+        (String) valueMap.get(KMSRESTConstants.VERSION_NAME_FIELD),
+        KMSRESTConstants.VERSION_NAME_FIELD);
+
+    byte[] iv = Base64.decodeBase64(checkNotNull(
+        (String) valueMap.get(KMSRESTConstants.IV_FIELD),
+        KMSRESTConstants.IV_FIELD));
+
+    Map encValueMap = checkNotNull((Map)
+            valueMap.get(KMSRESTConstants.ENCRYPTED_KEY_VERSION_FIELD),
+        KMSRESTConstants.ENCRYPTED_KEY_VERSION_FIELD);
+
+    String encVersionName = checkNotNull((String)
+            encValueMap.get(KMSRESTConstants.VERSION_NAME_FIELD),
+        KMSRESTConstants.VERSION_NAME_FIELD);
+
+    byte[] encKeyMaterial = Base64.decodeBase64(checkNotNull((String)
+            encValueMap.get(KMSRESTConstants.MATERIAL_FIELD),
+        KMSRESTConstants.MATERIAL_FIELD));
+
+    return new KMSClientProvider.KMSEncryptedKeyVersion(keyName, versionName,
+        iv, encVersionName, encKeyMaterial);
+  }
+
+  @SuppressWarnings("unchecked")
+  public static KeyProvider.KeyVersion parseJSONKeyVersion(Map valueMap) {
+    checkNotNull(valueMap, "valueMap");
+    KeyProvider.KeyVersion keyVersion = null;
+    if (!valueMap.isEmpty()) {
+      byte[] material =
+          (valueMap.containsKey(KMSRESTConstants.MATERIAL_FIELD)) ?
+              Base64.decodeBase64(
+                  (String) valueMap.get(KMSRESTConstants.MATERIAL_FIELD)) :
+              null;
+      String versionName =
+          (String) valueMap.get(KMSRESTConstants.VERSION_NAME_FIELD);
+      String keyName = (String) valueMap.get(KMSRESTConstants.NAME_FIELD);
+      keyVersion =
+          new KMSClientProvider.KMSKeyVersion(keyName, versionName, material);
+    }
+    return keyVersion;
+  }
+
+  @SuppressWarnings("unchecked")
+  public static KeyProvider.Metadata parseJSONMetadata(Map valueMap) {
+    checkNotNull(valueMap, "valueMap");
+    KeyProvider.Metadata metadata = null;
+    if (!valueMap.isEmpty()) {
+      metadata = new KMSClientProvider.KMSMetadata(
+          (String) valueMap.get(KMSRESTConstants.CIPHER_FIELD),
+          (Integer) valueMap.get(KMSRESTConstants.LENGTH_FIELD),
+          (String) valueMap.get(KMSRESTConstants.DESCRIPTION_FIELD),
+          (Map<String, String>) valueMap.get(KMSRESTConstants.ATTRIBUTES_FIELD),
+          new Date((Long) valueMap.get(KMSRESTConstants.CREATED_FIELD)),
+          (Integer) valueMap.get(KMSRESTConstants.VERSIONS_FIELD));
+    }
+    return metadata;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ec5acc7/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestKeyProviderCryptoExtension.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestKeyProviderCryptoExtension.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestKeyProviderCryptoExtension.java
index 32938e3..e897423 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestKeyProviderCryptoExtension.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestKeyProviderCryptoExtension.java
@@ -22,6 +22,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.security.GeneralSecurityException;
 import java.security.SecureRandom;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
@@ -40,7 +41,9 @@ import org.junit.rules.Timeout;
 import static org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class TestKeyProviderCryptoExtension {
@@ -90,13 +93,7 @@ public class TestKeyProviderCryptoExtension {
     KeyVersion k1 = kpExt.decryptEncryptedKey(ek1);
     assertEquals(KeyProviderCryptoExtension.EK, k1.getVersionName());
     assertEquals(encryptionKey.getMaterial().length, k1.getMaterial().length);
-    if (Arrays.equals(k1.getMaterial(), encryptionKey.getMaterial())) {
-      fail("Encrypted key material should not equal encryption key material");
-    }
-    if (Arrays.equals(ek1.getEncryptedKeyVersion().getMaterial(),
-        encryptionKey.getMaterial())) {
-      fail("Encrypted key material should not equal decrypted key material");
-    }
+
     // Decrypt it again and it should be the same
     KeyVersion k1a = kpExt.decryptEncryptedKey(ek1);
     assertArrayEquals(k1.getMaterial(), k1a.getMaterial());
@@ -153,9 +150,6 @@ public class TestKeyProviderCryptoExtension {
     final KeyVersion k1 = kpExt.decryptEncryptedKey(ek1);
     assertEquals(KeyProviderCryptoExtension.EK, k1.getVersionName());
     assertEquals(encryptionKey.getMaterial().length, k1.getMaterial().length);
-    if (Arrays.equals(k1.getMaterial(), encryptionKey.getMaterial())) {
-      fail("Encrypted key material should not equal encryption key material");
-    }
 
     // Roll the EK
     kpExt.rollNewVersion(ek1.getEncryptionKeyName());
@@ -173,10 +167,7 @@ public class TestKeyProviderCryptoExtension {
     assertEquals("Length of encryption key material and EEK material should "
             + "be the same", encryptionKey.getMaterial().length,
         ek2.getEncryptedKeyVersion().getMaterial().length);
-    if (Arrays.equals(ek2.getEncryptedKeyVersion().getMaterial(),
-        encryptionKey.getMaterial())) {
-      fail("Encrypted key material should not equal decrypted key material");
-    }
+
     if (Arrays.equals(ek2.getEncryptedKeyVersion().getMaterial(),
         ek1.getEncryptedKeyVersion().getMaterial())) {
       fail("Re-encrypted EEK should have different material");
@@ -186,9 +177,6 @@ public class TestKeyProviderCryptoExtension {
     final KeyVersion k2 = kpExt.decryptEncryptedKey(ek2);
     assertEquals(KeyProviderCryptoExtension.EK, k2.getVersionName());
     assertEquals(encryptionKey.getMaterial().length, k2.getMaterial().length);
-    if (Arrays.equals(k2.getMaterial(), encryptionKey.getMaterial())) {
-      fail("Encrypted key material should not equal encryption key material");
-    }
 
     // Re-encrypting the same EEK with the same EK should be deterministic
     final KeyProviderCryptoExtension.EncryptedKeyVersion ek2a =
@@ -203,10 +191,7 @@ public class TestKeyProviderCryptoExtension {
     assertEquals("Length of encryption key material and EEK material should "
             + "be the same", encryptionKey.getMaterial().length,
         ek2a.getEncryptedKeyVersion().getMaterial().length);
-    if (Arrays.equals(ek2a.getEncryptedKeyVersion().getMaterial(),
-        encryptionKey.getMaterial())) {
-      fail("Encrypted key material should not equal decrypted key material");
-    }
+
     if (Arrays.equals(ek2a.getEncryptedKeyVersion().getMaterial(),
         ek1.getEncryptedKeyVersion().getMaterial())) {
       fail("Re-encrypted EEK should have different material");
@@ -227,10 +212,6 @@ public class TestKeyProviderCryptoExtension {
     assertEquals("Length of encryption key material and EEK material should "
             + "be the same", encryptionKey.getMaterial().length,
         ek3.getEncryptedKeyVersion().getMaterial().length);
-    if (Arrays.equals(ek3.getEncryptedKeyVersion().getMaterial(),
-        encryptionKey.getMaterial())) {
-      fail("Encrypted key material should not equal decrypted key material");
-    }
 
     if (Arrays.equals(ek3.getEncryptedKeyVersion().getMaterial(),
         ek1.getEncryptedKeyVersion().getMaterial())) {
@@ -241,6 +222,78 @@ public class TestKeyProviderCryptoExtension {
   }
 
   @Test
+  public void testReencryptEncryptedKeys() throws Exception {
+    List<EncryptedKeyVersion> ekvs = new ArrayList<>(4);
+    // Generate 2 new EEKs @v0 and add to the list
+    ekvs.add(kpExt.generateEncryptedKey(encryptionKey.getName()));
+    ekvs.add(kpExt.generateEncryptedKey(encryptionKey.getName()));
+
+    // Roll the EK
+    kpExt.rollNewVersion(ekvs.get(0).getEncryptionKeyName());
+    // Generate 1 new EEK @v1 add to the list.
+    ekvs.add(kpExt.generateEncryptedKey(encryptionKey.getName()));
+
+    // Roll the EK again
+    kpExt.rollNewVersion(ekvs.get(0).getEncryptionKeyName());
+    // Generate 1 new EEK @v2 add to the list.
+    ekvs.add(kpExt.generateEncryptedKey(encryptionKey.getName()));
+
+    // leave a deep copy of the original, for verification purpose.
+    List<EncryptedKeyVersion> ekvsOrig = new ArrayList<>(ekvs.size());
+    for (EncryptedKeyVersion ekv : ekvs) {
+      ekvsOrig.add(new EncryptedKeyVersion(ekv.getEncryptionKeyName(),
+          ekv.getEncryptionKeyVersionName(), ekv.getEncryptedKeyIv(),
+          ekv.getEncryptedKeyVersion()));
+    }
+
+    // Reencrypt ekvs
+    kpExt.reencryptEncryptedKeys(ekvs);
+
+    // Verify each ekv
+    for (int i = 0; i < ekvs.size(); ++i) {
+      final EncryptedKeyVersion ekv = ekvs.get(i);
+      final EncryptedKeyVersion orig = ekvsOrig.get(i);
+      assertEquals("Version name should be EEK",
+          KeyProviderCryptoExtension.EEK,
+          ekv.getEncryptedKeyVersion().getVersionName());
+      assertEquals("Encryption key name should be " + ENCRYPTION_KEY_NAME,
+          ENCRYPTION_KEY_NAME, ekv.getEncryptionKeyName());
+      assertNotNull("Expected encrypted key material",
+          ekv.getEncryptedKeyVersion().getMaterial());
+      assertEquals("Length of encryption key material and EEK material should "
+              + "be the same", encryptionKey.getMaterial().length,
+          ekv.getEncryptedKeyVersion().getMaterial().length);
+      assertFalse(
+          "Encrypted key material should not equal encryption key material",
+          Arrays.equals(ekv.getEncryptedKeyVersion().getMaterial(),
+              encryptionKey.getMaterial()));
+
+      if (i < 3) {
+        assertFalse("Re-encrypted EEK should have different material",
+            Arrays.equals(ekv.getEncryptedKeyVersion().getMaterial(),
+                orig.getEncryptedKeyVersion().getMaterial()));
+      } else {
+        assertTrue("Re-encrypted EEK should have same material",
+            Arrays.equals(ekv.getEncryptedKeyVersion().getMaterial(),
+                orig.getEncryptedKeyVersion().getMaterial()));
+      }
+
+      // Decrypt the new EEK into an EK and check it
+      final KeyVersion kv = kpExt.decryptEncryptedKey(ekv);
+      assertEquals(KeyProviderCryptoExtension.EK, kv.getVersionName());
+
+      // Decrypt it again and it should be the same
+      KeyVersion kv1 = kpExt.decryptEncryptedKey(ekv);
+      assertArrayEquals(kv.getMaterial(), kv1.getMaterial());
+
+      // Verify decrypting the new EEK and orig EEK gives the same material.
+      final KeyVersion origKv = kpExt.decryptEncryptedKey(orig);
+      assertTrue("Returned EEK and original EEK should both decrypt to the "
+          + "same kv.", Arrays.equals(origKv.getMaterial(), kv.getMaterial()));
+    }
+  }
+
+  @Test
   public void testNonDefaultCryptoExtensionSelectionWithCachingKeyProvider()
           throws Exception {
     Configuration config = new Configuration();
@@ -342,6 +395,11 @@ public class TestKeyProviderCryptoExtension {
     }
 
     @Override
+    public void reencryptEncryptedKeys(List<EncryptedKeyVersion> ekvs)
+        throws IOException, GeneralSecurityException {
+    }
+
+    @Override
     public KeyVersion decryptEncryptedKey(
             EncryptedKeyVersion encryptedKeyVersion)
             throws IOException, GeneralSecurityException {
@@ -453,5 +511,10 @@ public class TestKeyProviderCryptoExtension {
         throws IOException, GeneralSecurityException {
       return ekv;
     }
+
+    @Override
+    public void reencryptEncryptedKeys(List<EncryptedKeyVersion> ekvs)
+        throws IOException, GeneralSecurityException {
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ec5acc7/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/EagerKeyGeneratorKeyProviderCryptoExtension.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/EagerKeyGeneratorKeyProviderCryptoExtension.java b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/EagerKeyGeneratorKeyProviderCryptoExtension.java
index eb24394..273c673 100644
--- a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/EagerKeyGeneratorKeyProviderCryptoExtension.java
+++ b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/EagerKeyGeneratorKeyProviderCryptoExtension.java
@@ -141,6 +141,12 @@ public class EagerKeyGeneratorKeyProviderCryptoExtension
         throws IOException, GeneralSecurityException {
       return keyProviderCryptoExtension.reencryptEncryptedKey(ekv);
     }
+
+    @Override
+    public void reencryptEncryptedKeys(List<EncryptedKeyVersion> ekvs)
+        throws IOException, GeneralSecurityException {
+      keyProviderCryptoExtension.reencryptEncryptedKeys(ekvs);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ec5acc7/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMS.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMS.java b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMS.java
index 27cc05d..dfc6872 100644
--- a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMS.java
+++ b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMS.java
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.crypto.key.kms.server;
 
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import org.apache.hadoop.util.KMSUtil;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.crypto.key.KeyProvider;
@@ -53,6 +56,9 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.hadoop.util.KMSUtil.checkNotEmpty;
+import static org.apache.hadoop.util.KMSUtil.checkNotNull;
+
 /**
  * Class providing the REST bindings, via Jersey, for the KMS.
  */
@@ -64,7 +70,7 @@ public class KMS {
     CREATE_KEY, DELETE_KEY, ROLL_NEW_VERSION, INVALIDATE_CACHE,
     GET_KEYS, GET_KEYS_METADATA,
     GET_KEY_VERSIONS, GET_METADATA, GET_KEY_VERSION, GET_CURRENT_KEY,
-    GENERATE_EEK, DECRYPT_EEK, REENCRYPT_EEK
+    GENERATE_EEK, DECRYPT_EEK, REENCRYPT_EEK, REENCRYPT_EEK_BATCH
   }
 
   private KeyProviderCryptoExtension provider;
@@ -72,6 +78,8 @@ public class KMS {
 
   static final Logger LOG = LoggerFactory.getLogger(KMS.class);
 
+  private static final int MAX_NUM_PER_BATCH = 10000;
+
   public KMS() throws Exception {
     provider = KMSWebApp.getKeyProvider();
     kmsAudit= KMSWebApp.getKMSAudit();
@@ -109,7 +117,7 @@ public class KMS {
       KMSWebApp.getAdminCallsMeter().mark();
       UserGroupInformation user = HttpUserGroupInformation.get();
       final String name = (String) jsonKey.get(KMSRESTConstants.NAME_FIELD);
-      KMSClientProvider.checkNotEmpty(name, KMSRESTConstants.NAME_FIELD);
+      checkNotEmpty(name, KMSRESTConstants.NAME_FIELD);
       assertAccess(KMSACLs.Type.CREATE, user, KMSOp.CREATE_KEY, name);
       String cipher = (String) jsonKey.get(KMSRESTConstants.CIPHER_FIELD);
       final String material;
@@ -158,7 +166,7 @@ public class KMS {
       if (!KMSWebApp.getACLs().hasAccess(KMSACLs.Type.GET, user)) {
         keyVersion = removeKeyMaterial(keyVersion);
       }
-      Map json = KMSServerJSONUtils.toJSON(keyVersion);
+      Map json = KMSUtil.toJSON(keyVersion);
       String requestURL = KMSMDCFilter.getURL();
       int idx = requestURL.lastIndexOf(KMSRESTConstants.KEYS_RESOURCE);
       requestURL = requestURL.substring(0, idx);
@@ -181,7 +189,7 @@ public class KMS {
       KMSWebApp.getAdminCallsMeter().mark();
       UserGroupInformation user = HttpUserGroupInformation.get();
       assertAccess(KMSACLs.Type.DELETE, user, KMSOp.DELETE_KEY, name);
-      KMSClientProvider.checkNotEmpty(name, "name");
+      checkNotEmpty(name, "name");
       LOG.debug("Deleting key with name {}.", name);
       user.doAs(new PrivilegedExceptionAction<Void>() {
         @Override
@@ -212,7 +220,7 @@ public class KMS {
       KMSWebApp.getAdminCallsMeter().mark();
       UserGroupInformation user = HttpUserGroupInformation.get();
       assertAccess(KMSACLs.Type.ROLLOVER, user, KMSOp.ROLL_NEW_VERSION, name);
-      KMSClientProvider.checkNotEmpty(name, "name");
+      checkNotEmpty(name, "name");
       LOG.debug("Rolling key with name {}.", name);
       final String material = (String)
               jsonMaterial.get(KMSRESTConstants.MATERIAL_FIELD);
@@ -242,7 +250,7 @@ public class KMS {
       if (!KMSWebApp.getACLs().hasAccess(KMSACLs.Type.GET, user)) {
         keyVersion = removeKeyMaterial(keyVersion);
       }
-      Map json = KMSServerJSONUtils.toJSON(keyVersion);
+      Map json = KMSUtil.toJSON(keyVersion);
       LOG.trace("Exiting rolloverKey Method.");
       return Response.ok().type(MediaType.APPLICATION_JSON).entity(json)
               .build();
@@ -260,7 +268,7 @@ public class KMS {
     try {
       LOG.trace("Entering invalidateCache Method.");
       KMSWebApp.getAdminCallsMeter().mark();
-      KMSClientProvider.checkNotEmpty(name, "name");
+      checkNotEmpty(name, "name");
       UserGroupInformation user = HttpUserGroupInformation.get();
       assertAccess(KMSACLs.Type.ROLLOVER, user, KMSOp.INVALIDATE_CACHE, name);
       LOG.debug("Invalidating cache with key name {}.", name);
@@ -369,7 +377,7 @@ public class KMS {
     try {
       LOG.trace("Entering getMetadata method.");
       UserGroupInformation user = HttpUserGroupInformation.get();
-      KMSClientProvider.checkNotEmpty(name, "name");
+      checkNotEmpty(name, "name");
       KMSWebApp.getAdminCallsMeter().mark();
       assertAccess(KMSACLs.Type.GET_METADATA, user, KMSOp.GET_METADATA, name);
       LOG.debug("Getting metadata for key with name {}.", name);
@@ -403,7 +411,7 @@ public class KMS {
     try {
       LOG.trace("Entering getCurrentVersion method.");
       UserGroupInformation user = HttpUserGroupInformation.get();
-      KMSClientProvider.checkNotEmpty(name, "name");
+      checkNotEmpty(name, "name");
       KMSWebApp.getKeyCallsMeter().mark();
       assertAccess(KMSACLs.Type.GET, user, KMSOp.GET_CURRENT_KEY, name);
       LOG.debug("Getting key version for key with name {}.", name);
@@ -417,7 +425,7 @@ public class KMS {
             }
       );
 
-      Object json = KMSServerJSONUtils.toJSON(keyVersion);
+      Object json = KMSUtil.toJSON(keyVersion);
       kmsAudit.ok(user, KMSOp.GET_CURRENT_KEY, name, "");
       LOG.trace("Exiting getCurrentVersion method.");
       return Response.ok().type(MediaType.APPLICATION_JSON).entity(json)
@@ -436,7 +444,7 @@ public class KMS {
     try {
       LOG.trace("Entering getKeyVersion method.");
       UserGroupInformation user = HttpUserGroupInformation.get();
-      KMSClientProvider.checkNotEmpty(versionName, "versionName");
+      checkNotEmpty(versionName, "versionName");
       KMSWebApp.getKeyCallsMeter().mark();
       assertAccess(KMSACLs.Type.GET, user, KMSOp.GET_KEY_VERSION);
       LOG.debug("Getting key with version name {}.", versionName);
@@ -453,7 +461,7 @@ public class KMS {
       if (keyVersion != null) {
         kmsAudit.ok(user, KMSOp.GET_KEY_VERSION, keyVersion.getName(), "");
       }
-      Object json = KMSServerJSONUtils.toJSON(keyVersion);
+      Object json = KMSUtil.toJSON(keyVersion);
       LOG.trace("Exiting getKeyVersion method.");
       return Response.ok().type(MediaType.APPLICATION_JSON).entity(json)
               .build();
@@ -477,8 +485,8 @@ public class KMS {
     try {
       LOG.trace("Entering generateEncryptedKeys method.");
       UserGroupInformation user = HttpUserGroupInformation.get();
-      KMSClientProvider.checkNotEmpty(name, "name");
-      KMSClientProvider.checkNotNull(edekOp, "eekOp");
+      checkNotEmpty(name, "name");
+      checkNotNull(edekOp, "eekOp");
       LOG.debug("Generating encrypted key with name {}," +
               " the edek Operation is {}.", name, edekOp);
 
@@ -512,7 +520,7 @@ public class KMS {
         kmsAudit.ok(user, KMSOp.GENERATE_EEK, name, "");
         retJSON = new ArrayList();
         for (EncryptedKeyVersion edek : retEdeks) {
-          ((ArrayList) retJSON).add(KMSServerJSONUtils.toJSON(edek));
+          ((ArrayList) retJSON).add(KMSUtil.toJSON(edek));
         }
       } else {
         StringBuilder error;
@@ -537,6 +545,64 @@ public class KMS {
 
   @SuppressWarnings("rawtypes")
   @POST
+  @Path(KMSRESTConstants.KEY_RESOURCE + "/{name:.*}/" +
+      KMSRESTConstants.REENCRYPT_BATCH_SUB_RESOURCE)
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8)
+  public Response reencryptEncryptedKeys(
+      @PathParam("name") final String name,
+      final List<Map> jsonPayload)
+      throws Exception {
+    LOG.trace("Entering reencryptEncryptedKeys method.");
+    try {
+      final Stopwatch sw = new Stopwatch().start();
+      checkNotEmpty(name, "name");
+      checkNotNull(jsonPayload, "jsonPayload");
+      final UserGroupInformation user = HttpUserGroupInformation.get();
+      KMSWebApp.getReencryptEEKBatchCallsMeter().mark();
+      if (jsonPayload.size() > MAX_NUM_PER_BATCH) {
+        LOG.warn("Payload size {} too big for reencryptEncryptedKeys from"
+            + " user {}.", jsonPayload.size(), user);
+      }
+      assertAccess(KMSACLs.Type.GENERATE_EEK, user, KMSOp.REENCRYPT_EEK_BATCH,
+          name);
+      LOG.debug("Batch reencrypting {} Encrypted Keys for key name {}",
+          jsonPayload.size(), name);
+      final List<EncryptedKeyVersion> ekvs =
+          KMSUtil.parseJSONEncKeyVersions(name, jsonPayload);
+      Preconditions.checkArgument(ekvs.size() == jsonPayload.size(),
+          "EncryptedKey size mismatch after parsing from json");
+      for (EncryptedKeyVersion ekv : ekvs) {
+        Preconditions.checkArgument(name.equals(ekv.getEncryptionKeyName()),
+            "All EncryptedKeys must be under the given key name " + name);
+      }
+
+      user.doAs(new PrivilegedExceptionAction<Void>() {
+        @Override
+        public Void run() throws Exception {
+          provider.reencryptEncryptedKeys(ekvs);
+          return null;
+        }
+      });
+      List retJSON = new ArrayList<>(ekvs.size());
+      for (EncryptedKeyVersion ekv: ekvs) {
+        retJSON.add(KMSUtil.toJSON(ekv));
+      }
+      kmsAudit.ok(user, KMSOp.REENCRYPT_EEK_BATCH, name,
+          "reencrypted " + ekvs.size() + " keys");
+      LOG.info("reencryptEncryptedKeys {} keys for key {} took {}",
+          jsonPayload.size(), name, sw.stop());
+      LOG.trace("Exiting reencryptEncryptedKeys method.");
+      return Response.ok().type(MediaType.APPLICATION_JSON).entity(retJSON)
+          .build();
+    } catch (Exception e) {
+      LOG.debug("Exception in reencryptEncryptedKeys.", e);
+      throw e;
+    }
+  }
+
+  @SuppressWarnings("rawtypes")
+  @POST
   @Path(KMSRESTConstants.KEY_VERSION_RESOURCE + "/{versionName:.*}/" +
       KMSRESTConstants.EEK_SUB_RESOURCE)
   @Produces(MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8)
@@ -548,8 +614,8 @@ public class KMS {
     try {
       LOG.trace("Entering decryptEncryptedKey method.");
       UserGroupInformation user = HttpUserGroupInformation.get();
-      KMSClientProvider.checkNotEmpty(versionName, "versionName");
-      KMSClientProvider.checkNotNull(eekOp, "eekOp");
+      checkNotEmpty(versionName, "versionName");
+      checkNotNull(eekOp, "eekOp");
       LOG.debug("Decrypting key for {}, the edek Operation is {}.",
               versionName, eekOp);
 
@@ -558,13 +624,14 @@ public class KMS {
       String ivStr = (String) jsonPayload.get(KMSRESTConstants.IV_FIELD);
       String encMaterialStr =
               (String) jsonPayload.get(KMSRESTConstants.MATERIAL_FIELD);
-      KMSClientProvider.checkNotNull(ivStr, KMSRESTConstants.IV_FIELD);
+      checkNotNull(ivStr, KMSRESTConstants.IV_FIELD);
       final byte[] iv = Base64.decodeBase64(ivStr);
-      KMSClientProvider.checkNotNull(encMaterialStr,
+      checkNotNull(encMaterialStr,
           KMSRESTConstants.MATERIAL_FIELD);
       final byte[] encMaterial = Base64.decodeBase64(encMaterialStr);
       Object retJSON;
       if (eekOp.equals(KMSRESTConstants.EEK_DECRYPT)) {
+        KMSWebApp.getDecryptEEKCallsMeter().mark();
         assertAccess(KMSACLs.Type.DECRYPT_EEK, user, KMSOp.DECRYPT_EEK,
                 keyName);
 
@@ -582,9 +649,10 @@ public class KMS {
               }
         );
 
-        retJSON = KMSServerJSONUtils.toJSON(retKeyVersion);
+        retJSON = KMSUtil.toJSON(retKeyVersion);
         kmsAudit.ok(user, KMSOp.DECRYPT_EEK, keyName, "");
       } else if (eekOp.equals(KMSRESTConstants.EEK_REENCRYPT)) {
+        KMSWebApp.getReencryptEEKCallsMeter().mark();
         assertAccess(KMSACLs.Type.GENERATE_EEK, user, KMSOp.REENCRYPT_EEK,
             keyName);
 
@@ -599,7 +667,7 @@ public class KMS {
               }
             });
 
-        retJSON = KMSServerJSONUtils.toJSON(retEncryptedKeyVersion);
+        retJSON = KMSUtil.toJSON(retEncryptedKeyVersion);
         kmsAudit.ok(user, KMSOp.REENCRYPT_EEK, keyName, "");
       } else {
         StringBuilder error;
@@ -612,7 +680,6 @@ public class KMS {
         LOG.error(error.toString());
         throw new IllegalArgumentException(error.toString());
       }
-      KMSWebApp.getDecryptEEKCallsMeter().mark();
       LOG.trace("Exiting handleEncryptedKeyOp method.");
       return Response.ok().type(MediaType.APPLICATION_JSON).entity(retJSON)
           .build();
@@ -631,7 +698,7 @@ public class KMS {
     try {
       LOG.trace("Entering getKeyVersions method.");
       UserGroupInformation user = HttpUserGroupInformation.get();
-      KMSClientProvider.checkNotEmpty(name, "name");
+      checkNotEmpty(name, "name");
       KMSWebApp.getKeyCallsMeter().mark();
       assertAccess(KMSACLs.Type.GET, user, KMSOp.GET_KEY_VERSIONS, name);
       LOG.debug("Getting key versions for key {}", name);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ec5acc7/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSJSONReader.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSJSONReader.java b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSJSONReader.java
index 14a5451..f6f670b 100644
--- a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSJSONReader.java
+++ b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSJSONReader.java
@@ -31,21 +31,23 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.lang.annotation.Annotation;
 import java.lang.reflect.Type;
+import java.util.List;
 import java.util.Map;
 
 @Provider
 @Consumes(MediaType.APPLICATION_JSON)
 @InterfaceAudience.Private
-public class KMSJSONReader implements MessageBodyReader<Map> {
+public class KMSJSONReader implements MessageBodyReader<Object> {
 
   @Override
   public boolean isReadable(Class<?> type, Type genericType,
       Annotation[] annotations, MediaType mediaType) {
-    return type.isAssignableFrom(Map.class);
+    return type.isAssignableFrom(Map.class) || type
+        .isAssignableFrom(List.class);
   }
 
   @Override
-  public Map readFrom(Class<Map> type, Type genericType,
+  public Object readFrom(Class<Object> type, Type genericType,
       Annotation[] annotations, MediaType mediaType,
       MultivaluedMap<String, String> httpHeaders, InputStream entityStream)
       throws IOException, WebApplicationException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ec5acc7/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSServerJSONUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSServerJSONUtils.java b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSServerJSONUtils.java
index 24af81b..d7e6e46 100644
--- a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSServerJSONUtils.java
+++ b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSServerJSONUtils.java
@@ -17,11 +17,10 @@
  */
 package org.apache.hadoop.crypto.key.kms.server;
 
-import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.crypto.key.KeyProvider;
-import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
 import org.apache.hadoop.crypto.key.kms.KMSRESTConstants;
+import org.apache.hadoop.util.KMSUtil;
 
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
@@ -33,48 +32,19 @@ import java.util.Map;
  */
 @InterfaceAudience.Private
 public class KMSServerJSONUtils {
-  @SuppressWarnings("unchecked")
-  public static Map toJSON(KeyProvider.KeyVersion keyVersion) {
-    Map json = new LinkedHashMap();
-    if (keyVersion != null) {
-      json.put(KMSRESTConstants.NAME_FIELD,
-          keyVersion.getName());
-      json.put(KMSRESTConstants.VERSION_NAME_FIELD,
-          keyVersion.getVersionName());
-      json.put(KMSRESTConstants.MATERIAL_FIELD,
-          Base64.encodeBase64URLSafeString(
-              keyVersion.getMaterial()));
-    }
-    return json;
-  }
 
   @SuppressWarnings("unchecked")
   public static List toJSON(List<KeyProvider.KeyVersion> keyVersions) {
     List json = new ArrayList();
     if (keyVersions != null) {
       for (KeyProvider.KeyVersion version : keyVersions) {
-        json.add(toJSON(version));
+        json.add(KMSUtil.toJSON(version));
       }
     }
     return json;
   }
 
   @SuppressWarnings("unchecked")
-  public static Map toJSON(EncryptedKeyVersion encryptedKeyVersion) {
-    Map json = new LinkedHashMap();
-    if (encryptedKeyVersion != null) {
-      json.put(KMSRESTConstants.VERSION_NAME_FIELD,
-          encryptedKeyVersion.getEncryptionKeyVersionName());
-      json.put(KMSRESTConstants.IV_FIELD,
-          Base64.encodeBase64URLSafeString(
-              encryptedKeyVersion.getEncryptedKeyIv()));
-      json.put(KMSRESTConstants.ENCRYPTED_KEY_VERSION_FIELD,
-          toJSON(encryptedKeyVersion.getEncryptedKeyVersion()));
-    }
-    return json;
-  }
-
-  @SuppressWarnings("unchecked")
   public static Map toJSON(String keyName, KeyProvider.Metadata meta) {
     Map json = new LinkedHashMap();
     if (meta != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ec5acc7/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSWebApp.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSWebApp.java b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSWebApp.java
index ac24105..9a71fa2 100644
--- a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSWebApp.java
+++ b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSWebApp.java
@@ -60,6 +60,10 @@ public class KMSWebApp implements ServletContextListener {
       "generate_eek.calls.meter";
   private static final String DECRYPT_EEK_METER = METRICS_PREFIX +
       "decrypt_eek.calls.meter";
+  private static final String REENCRYPT_EEK_METER = METRICS_PREFIX +
+      "reencrypt_eek.calls.meter";
+  private static final String REENCRYPT_EEK_BATCH_METER = METRICS_PREFIX +
+      "reencrypt_eek_batch.calls.meter";
 
   private static Logger LOG;
   private static MetricRegistry metricRegistry;
@@ -72,6 +76,8 @@ public class KMSWebApp implements ServletContextListener {
   private static Meter unauthorizedCallsMeter;
   private static Meter unauthenticatedCallsMeter;
   private static Meter decryptEEKCallsMeter;
+  private static Meter reencryptEEKCallsMeter;
+  private static Meter reencryptEEKBatchCallsMeter;
   private static Meter generateEEKCallsMeter;
   private static Meter invalidCallsMeter;
   private static KMSAudit kmsAudit;
@@ -131,6 +137,10 @@ public class KMSWebApp implements ServletContextListener {
           new Meter());
       decryptEEKCallsMeter = metricRegistry.register(DECRYPT_EEK_METER,
           new Meter());
+      reencryptEEKCallsMeter = metricRegistry.register(REENCRYPT_EEK_METER,
+          new Meter());
+      reencryptEEKBatchCallsMeter = metricRegistry.register(
+          REENCRYPT_EEK_BATCH_METER, new Meter());
       adminCallsMeter = metricRegistry.register(ADMIN_CALLS_METER, new Meter());
       keyCallsMeter = metricRegistry.register(KEY_CALLS_METER, new Meter());
       invalidCallsMeter = metricRegistry.register(INVALID_CALLS_METER,
@@ -239,6 +249,14 @@ public class KMSWebApp implements ServletContextListener {
     return decryptEEKCallsMeter;
   }
 
+  public static Meter getReencryptEEKCallsMeter() {
+    return reencryptEEKCallsMeter;
+  }
+
+  public static Meter getReencryptEEKBatchCallsMeter() {
+    return reencryptEEKBatchCallsMeter;
+  }
+
   public static Meter getUnauthorizedCallsMeter() {
     return unauthorizedCallsMeter;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ec5acc7/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KeyAuthorizationKeyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KeyAuthorizationKeyProvider.java b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KeyAuthorizationKeyProvider.java
index d53cdea..868db76 100644
--- a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KeyAuthorizationKeyProvider.java
+++ b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KeyAuthorizationKeyProvider.java
@@ -289,6 +289,25 @@ public class KeyAuthorizationKeyProvider extends KeyProviderCryptoExtension {
   }
 
   @Override
+  public void reencryptEncryptedKeys(List<EncryptedKeyVersion> ekvs)
+      throws IOException, GeneralSecurityException {
+    if (ekvs.isEmpty()) {
+      return;
+    }
+    readLock.lock();
+    try {
+      for (EncryptedKeyVersion ekv : ekvs) {
+        verifyKeyVersionBelongsToKey(ekv);
+      }
+      final String keyName = ekvs.get(0).getEncryptionKeyName();
+      doAccessCheck(keyName, KeyOpType.GENERATE_EEK);
+      provider.reencryptEncryptedKeys(ekvs);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
   public KeyVersion getKeyVersion(String versionName) throws IOException {
     readLock.lock();
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ec5acc7/hadoop-common-project/hadoop-kms/src/site/markdown/index.md.vm
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-kms/src/site/markdown/index.md.vm b/hadoop-common-project/hadoop-kms/src/site/markdown/index.md.vm
index 4573b06..1dd89e9 100644
--- a/hadoop-common-project/hadoop-kms/src/site/markdown/index.md.vm
+++ b/hadoop-common-project/hadoop-kms/src/site/markdown/index.md.vm
@@ -506,7 +506,7 @@ $H5 Key ACLs
 KMS supports access control for all non-read operations at the Key level. All Key Access operations are classified as :
 
 * MANAGEMENT - createKey, deleteKey, rolloverNewVersion
-* GENERATE_EEK - generateEncryptedKey, reencryptEncryptedKey, warmUpEncryptedKeys
+* GENERATE_EEK - generateEncryptedKey, reencryptEncryptedKey, reencryptEncryptedKeys, warmUpEncryptedKeys
 * DECRYPT_EEK - decryptEncryptedKey
 * READ - getKeyVersion, getKeyVersions, getMetadata, getKeysMetadata, getCurrentKey
 * ALL - all of the above
@@ -983,6 +983,64 @@ This is usually useful after a [Rollover](#Rollover_Key) of an encryption key. R
       }
     }
 
+$H4 Batch Re-encrypt Encrypted Keys With The Latest KeyVersion
+
+Batched version of the above re-encrypt Encrypted Key. This command takes a list of previously generated encrypted key, and re-encrypts them using the latest KeyVersion encryption key in the KeyProvider, and return the re-encrypted encrypted keys in the same sequence. For each encrypted key, if the latest KeyVersion is the same as the one used to generate the encrypted key, no action is taken and the same encrypted key is returned.
+
+This is usually useful after a [Rollover](#Rollover_Key) of an encryption key. Re-encrypting the encrypted key will allow it to be encrypted using the latest version of the encryption key, but still with the same key material and initialization vector.
+
+All Encrypted keys for a batch request must be under the same encryption key name, but could be potentially under different versions of the encryption key.
+
+*REQUEST:*
+
+    POST http://HOST:PORT/kms/v1/key/<key-name>/_reencryptbatch
+    Content-Type: application/json
+
+    [
+      {
+        "versionName"         : "<encryptionVersionName>",
+        "iv"                  : "<iv>",            //base64
+        "encryptedKeyVersion" : {
+            "versionName"       : "EEK",
+            "material"          : "<material>",    //base64
+        }
+      },
+      {
+        "versionName"         : "<encryptionVersionName>",
+        "iv"                  : "<iv>",            //base64
+        "encryptedKeyVersion" : {
+            "versionName"       : "EEK",
+            "material"          : "<material>",    //base64
+        }
+      },
+      ...
+    ]
+
+*RESPONSE:*
+
+    200 OK
+    Content-Type: application/json
+
+    [
+      {
+        "versionName"         : "<encryptionVersionName>",
+        "iv"                  : "<iv>",            //base64
+        "encryptedKeyVersion" : {
+            "versionName"       : "EEK",
+            "material"          : "<material>",    //base64
+        }
+      },
+      {
+        "versionName"         : "<encryptionVersionName>",
+        "iv"                  : "<iv>",            //base64
+        "encryptedKeyVersion" : {
+            "versionName"       : "EEK",
+            "material"          : "<material>",    //base64
+        }
+      },
+      ...
+    ]
+
 $H4 Get Key Version
 
 *REQUEST:*

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ec5acc7/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java b/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java
index a45906a..d6f2c25 100644
--- a/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java
+++ b/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java
@@ -97,6 +97,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.when;
 
 public class TestKMS {
@@ -722,6 +723,22 @@ public class TestKMS {
         assertArrayEquals(k1.getMaterial(), k1r.getMaterial());
         assertEquals(kv.getMaterial().length, k1r.getMaterial().length);
 
+        // test re-encrypt batch
+        EncryptedKeyVersion ek3 = kpExt.generateEncryptedKey(kv.getName());
+        KeyVersion latest = kpExt.rollNewVersion(kv.getName());
+        List<EncryptedKeyVersion> ekvs = new ArrayList<>(3);
+        ekvs.add(ek1);
+        ekvs.add(ek2);
+        ekvs.add(ek3);
+        ekvs.add(ek1);
+        ekvs.add(ek2);
+        ekvs.add(ek3);
+        kpExt.reencryptEncryptedKeys(ekvs);
+        for (EncryptedKeyVersion ekv: ekvs) {
+          assertEquals(latest.getVersionName(),
+              ekv.getEncryptionKeyVersionName());
+        }
+
         // deleteKey()
         kp.deleteKey("k1");
 
@@ -1134,6 +1151,10 @@ public class TestKMS {
                 KeyProviderCryptoExtension.createKeyProviderCryptoExtension(kp);
             EncryptedKeyVersion ekv = kpce.generateEncryptedKey("k1");
             kpce.reencryptEncryptedKey(ekv);
+            List<EncryptedKeyVersion> ekvs = new ArrayList<>(2);
+            ekvs.add(ekv);
+            ekvs.add(ekv);
+            kpce.reencryptEncryptedKeys(ekvs);
             return null;
           }
         });
@@ -1563,6 +1584,10 @@ public class TestKMS {
             KeyProviderCryptoExtension kpCE = KeyProviderCryptoExtension.
                 createKeyProviderCryptoExtension(kp);
             kpCE.reencryptEncryptedKey(encKv);
+            List<EncryptedKeyVersion> ekvs = new ArrayList<>(2);
+            ekvs.add(encKv);
+            ekvs.add(encKv);
+            kpCE.reencryptEncryptedKeys(ekvs);
             return null;
           }
         });
@@ -1669,8 +1694,27 @@ public class TestKMS {
               KeyProviderCryptoExtension kpCE = KeyProviderCryptoExtension.
                   createKeyProviderCryptoExtension(kp);
               kpCE.reencryptEncryptedKey(encKv);
+              fail("Should not have been able to reencryptEncryptedKey");
+            } catch (AuthorizationException ex) {
+              LOG.info("reencryptEncryptedKey caught expected exception.", ex);
+            }
+            return null;
+          }
+        });
+        doAs("GENERATE_EEK", new PrivilegedExceptionAction<Void>() {
+          @Override
+          public Void run() throws Exception {
+            KeyProvider kp = createProvider(uri, conf);
+            try {
+              KeyProviderCryptoExtension kpCE = KeyProviderCryptoExtension.
+                  createKeyProviderCryptoExtension(kp);
+              List<EncryptedKeyVersion> ekvs = new ArrayList<>(2);
+              ekvs.add(encKv);
+              ekvs.add(encKv);
+              kpCE.reencryptEncryptedKeys(ekvs);
+              fail("Should not have been able to reencryptEncryptedKeys");
             } catch (AuthorizationException ex) {
-              LOG.info("Caught expected exception.", ex);
+              LOG.info("reencryptEncryptedKeys caught expected exception.", ex);
             }
             return null;
           }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ec5acc7/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMSAudit.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMSAudit.java b/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMSAudit.java
index bd4ddbd..2e01f98 100644
--- a/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMSAudit.java
+++ b/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMSAudit.java
@@ -115,6 +115,9 @@ public class TestKMSAudit {
     kmsAudit.ok(luser, KMSOp.REENCRYPT_EEK, "k1", "testmsg");
     kmsAudit.ok(luser, KMSOp.REENCRYPT_EEK, "k1", "testmsg");
     kmsAudit.evictCacheForTesting();
+    kmsAudit.ok(luser, KMSOp.REENCRYPT_EEK_BATCH, "k1", "testmsg");
+    kmsAudit.ok(luser, KMSOp.REENCRYPT_EEK_BATCH, "k1", "testmsg");
+    kmsAudit.evictCacheForTesting();
     String out = getAndResetLogOutput();
     System.out.println(out);
     Assert.assertTrue(
@@ -128,7 +131,9 @@ public class TestKMSAudit {
             + "OK\\[op=DECRYPT_EEK, key=k1, user=luser, accessCount=6, interval=[^m]{1,4}ms\\] testmsg"
             + "OK\\[op=DECRYPT_EEK, key=k1, user=luser, accessCount=1, interval=[^m]{1,4}ms\\] testmsg"
             + "OK\\[op=REENCRYPT_EEK, key=k1, user=luser, accessCount=1, interval=[^m]{1,4}ms\\] testmsg"
-            + "OK\\[op=REENCRYPT_EEK, key=k1, user=luser, accessCount=3, interval=[^m]{1,4}ms\\] testmsg"));
+            + "OK\\[op=REENCRYPT_EEK, key=k1, user=luser, accessCount=3, interval=[^m]{1,4}ms\\] testmsg"
+            + "OK\\[op=REENCRYPT_EEK_BATCH, key=k1, user=luser\\] testmsg"
+            + "OK\\[op=REENCRYPT_EEK_BATCH, key=k1, user=luser\\] testmsg"));
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org