You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2013/12/20 01:25:43 UTC

svn commit: r1552462 - in /hadoop/common/trunk/hadoop-common-project/hadoop-common: ./ src/main/java/org/apache/hadoop/crypto/ src/main/java/org/apache/hadoop/crypto/key/ src/main/java/org/apache/hadoop/security/ src/main/resources/META-INF/services/ s...

Author: omalley
Date: Fri Dec 20 00:25:42 2013
New Revision: 1552462

URL: http://svn.apache.org/r1552462
Log:
HADOOP-10141. Create KeyProvider API to separate encryption key storage
from the applications. (omalley)

Added:
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/JavaKeyStoreProvider.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProvider.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderFactory.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/UserProvider.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.crypto.key.KeyProviderFactory
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestKeyProvider.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestKeyProviderFactory.java
Modified:
    hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
    hadoop/common/trunk/hadoop-common-project/hadoop-common/pom.xml
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/Credentials.java

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1552462&r1=1552461&r2=1552462&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt Fri Dec 20 00:25:42 2013
@@ -105,6 +105,9 @@ Trunk (Unreleased)
 
     HADOOP-9833 move slf4j to version 1.7.5 (Kousuke Saruta via stevel)
 
+    HADOOP-10141. Create KeyProvider API to separate encryption key storage
+    from the applications. (omalley)
+
   BUG FIXES
 
     HADOOP-9451. Fault single-layer config if node group topology is enabled.

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/pom.xml?rev=1552462&r1=1552461&r2=1552462&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/pom.xml (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/pom.xml Fri Dec 20 00:25:42 2013
@@ -210,6 +210,10 @@
       <scope>compile</scope>
     </dependency>
     <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-auth</artifactId>
       <scope>compile</scope>

Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/JavaKeyStoreProvider.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/JavaKeyStoreProvider.java?rev=1552462&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/JavaKeyStoreProvider.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/JavaKeyStoreProvider.java Fri Dec 20 00:25:42 2013
@@ -0,0 +1,313 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.crypto.key;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import javax.crypto.spec.SecretKeySpec;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.net.URI;
+import java.security.Key;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * KeyProvider based on Java's KeyStore file format. The file may be stored in
+ * any Hadoop FileSystem using the following name mangling:
+ *  jks://hdfs@nn1.example.com/my/keys.jks -> hdfs://nn1.example.com/my/keys.jks
+ *  jks://file/home/owen/keys.jks -> file:///home/owen/keys.jks
+ *
+ * The password for the keystore is taken from the HADOOP_KEYSTORE_PASSWORD
+ * environment variable with a default of 'none'.
+ *
+ * It is expected for encrypted InputFormats and OutputFormats to copy the keys
+ * from the original provider into the job's Credentials object, which is
+ * accessed via the UserProvider. Therefore, this provider won't be used by
+ * MapReduce tasks.
+ */
+@InterfaceAudience.Private
+public class JavaKeyStoreProvider extends KeyProvider {
+  public static final String SCHEME_NAME = "jceks";
+  public static final String KEYSTORE_PASSWORD_NAME =
+      "HADOOP_KEYSTORE_PASSWORD";
+  public static final String KEYSTORE_PASSWORD_DEFAULT = "none";
+
+  private final URI uri;
+  private final Path path;
+  private final FileSystem fs;
+  private final KeyStore keyStore;
+  private final char[] password;
+  private boolean changed = false;
+
+  private final Map<String, Metadata> cache = new HashMap<String, Metadata>();
+
+  private JavaKeyStoreProvider(URI uri, Configuration conf) throws IOException {
+    this.uri = uri;
+    path = unnestUri(uri);
+    fs = FileSystem.get(conf);
+    // Get the password from the user's environment
+    String pw = System.getenv(KEYSTORE_PASSWORD_NAME);
+    if (pw == null) {
+      pw = KEYSTORE_PASSWORD_DEFAULT;
+    }
+    password = pw.toCharArray();
+    try {
+      keyStore = KeyStore.getInstance(SCHEME_NAME);
+      if (fs.exists(path)) {
+        keyStore.load(fs.open(path), password);
+      } else {
+        // required to create an empty keystore. *sigh*
+        keyStore.load(null, password);
+      }
+    } catch (KeyStoreException e) {
+      throw new IOException("Can't create keystore", e);
+    } catch (NoSuchAlgorithmException e) {
+      throw new IOException("Can't load keystore " + path, e);
+    } catch (CertificateException e) {
+      throw new IOException("Can't load keystore " + path, e);
+    }
+  }
+
+  @Override
+  public KeyVersion getKeyVersion(String versionName) throws IOException {
+    SecretKeySpec key = null;
+    try {
+      if (!keyStore.containsAlias(versionName)) {
+        return null;
+      }
+      key = (SecretKeySpec) keyStore.getKey(versionName, password);
+    } catch (KeyStoreException e) {
+      throw new IOException("Can't get key " + versionName + " from " +
+                            path, e);
+    } catch (NoSuchAlgorithmException e) {
+      throw new IOException("Can't get algorithm for key " + key + " from " +
+                            path, e);
+    } catch (UnrecoverableKeyException e) {
+      throw new IOException("Can't recover key " + key + " from " + path, e);
+    }
+    return new KeyVersion(versionName, key.getEncoded());
+  }
+
+  @Override
+  public Metadata getMetadata(String name) throws IOException {
+    if (cache.containsKey(name)) {
+      return cache.get(name);
+    }
+    try {
+      if (!keyStore.containsAlias(name)) {
+        return null;
+      }
+      Metadata meta = ((KeyMetadata) keyStore.getKey(name, password)).metadata;
+      cache.put(name, meta);
+      return meta;
+    } catch (KeyStoreException e) {
+      throw new IOException("Can't get metadata for " + name +
+          " from keystore " + path, e);
+    } catch (NoSuchAlgorithmException e) {
+      throw new IOException("Can't get algorithm for " + name +
+          " from keystore " + path, e);
+    } catch (UnrecoverableKeyException e) {
+      throw new IOException("Can't recover key for " + name +
+          " from keystore " + path, e);
+    }
+  }
+
+  @Override
+  public KeyVersion createKey(String name, byte[] material,
+                               Options options) throws IOException {
+    try {
+      if (keyStore.containsAlias(name) || cache.containsKey(name)) {
+        throw new IOException("Key " + name + " already exists in " + this);
+      }
+    } catch (KeyStoreException e) {
+      throw new IOException("Problem looking up key " + name + " in " + this,
+          e);
+    }
+    Metadata meta = new Metadata(options.getCipher(), options.getBitLength(),
+        new Date(), 1);
+    if (options.getBitLength() != 8 * material.length) {
+      throw new IOException("Wrong key length. Required " +
+          options.getBitLength() + ", but got " + (8 * material.length));
+    }
+    cache.put(name, meta);
+    String versionName = buildVersionName(name, 0);
+    return innerSetKeyVersion(versionName, material, meta.getCipher());
+  }
+
+  @Override
+  public void deleteKey(String name) throws IOException {
+    Metadata meta = getMetadata(name);
+    if (meta == null) {
+      throw new IOException("Key " + name + " does not exist in " + this);
+    }
+    for(int v=0; v < meta.getVersions(); ++v) {
+      String versionName = buildVersionName(name, v);
+      try {
+        if (keyStore.containsAlias(versionName)) {
+          keyStore.deleteEntry(versionName);
+        }
+      } catch (KeyStoreException e) {
+        throw new IOException("Problem removing " + versionName + " from " +
+            this, e);
+      }
+    }
+    try {
+      if (keyStore.containsAlias(name)) {
+        keyStore.deleteEntry(name);
+      }
+    } catch (KeyStoreException e) {
+      throw new IOException("Problem removing " + name + " from " + this, e);
+    }
+    cache.remove(name);
+    changed = true;
+  }
+
+  KeyVersion innerSetKeyVersion(String versionName, byte[] material,
+                                String cipher) throws IOException {
+    try {
+      keyStore.setKeyEntry(versionName, new SecretKeySpec(material, cipher),
+          password, null);
+    } catch (KeyStoreException e) {
+      throw new IOException("Can't store key " + versionName + " in " + this,
+          e);
+    }
+    changed = true;
+    return new KeyVersion(versionName, material);
+  }
+
+  @Override
+  public KeyVersion rollNewVersion(String name,
+                                    byte[] material) throws IOException {
+    Metadata meta = getMetadata(name);
+    if (meta == null) {
+      throw new IOException("Key " + name + " not found");
+    }
+    if (meta.getBitLength() != 8 * material.length) {
+      throw new IOException("Wrong key length. Required " +
+          meta.getBitLength() + ", but got " + (8 * material.length));
+    }
+    int nextVersion = meta.addVersion();
+    String versionName = buildVersionName(name, nextVersion);
+    return innerSetKeyVersion(versionName, material, meta.getCipher());
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (!changed) {
+      return;
+    }
+    // put all of the updates into the keystore
+    for(Map.Entry<String, Metadata> entry: cache.entrySet()) {
+      try {
+        keyStore.setKeyEntry(entry.getKey(), new KeyMetadata(entry.getValue()),
+            password, null);
+      } catch (KeyStoreException e) {
+        throw new IOException("Can't set metadata key " + entry.getKey(),e );
+      }
+    }
+    // write out the keystore
+    FSDataOutputStream out = fs.create(path, true);
+    try {
+      keyStore.store(out, password);
+    } catch (KeyStoreException e) {
+      throw new IOException("Can't store keystore " + this, e);
+    } catch (NoSuchAlgorithmException e) {
+      throw new IOException("No such algorithm storing keystore " + this, e);
+    } catch (CertificateException e) {
+      throw new IOException("Certificate exception storing keystore " + this,
+          e);
+    }
+    out.close();
+    changed = false;
+  }
+
+  @Override
+  public String toString() {
+    return uri.toString();
+  }
+
+  /**
+   * The factory to create JksProviders, which is used by the ServiceLoader.
+   */
+  public static class Factory extends KeyProviderFactory {
+    @Override
+    public KeyProvider createProvider(URI providerName,
+                                      Configuration conf) throws IOException {
+      if (SCHEME_NAME.equals(providerName.getScheme())) {
+        return new JavaKeyStoreProvider(providerName, conf);
+      }
+      return null;
+    }
+  }
+
+  /**
+   * An adapter between a KeyStore Key and our Metadata. This is used to store
+   * the metadata in a KeyStore even though isn't really a key.
+   */
+  public static class KeyMetadata implements Key, Serializable {
+    private Metadata metadata;
+    private final static long serialVersionUID = 8405872419967874451L;
+
+    private KeyMetadata(Metadata meta) {
+      this.metadata = meta;
+    }
+
+    @Override
+    public String getAlgorithm() {
+      return metadata.getCipher();
+    }
+
+    @Override
+    public String getFormat() {
+      return "KeyMetadata";
+    }
+
+    @Override
+    public byte[] getEncoded() {
+      return new byte[0];
+    }
+
+    private void writeObject(ObjectOutputStream out) throws IOException {
+      byte[] serialized = metadata.serialize();
+      out.writeInt(serialized.length);
+      out.write(serialized);
+    }
+
+    private void readObject(ObjectInputStream in
+                            ) throws IOException, ClassNotFoundException {
+      byte[] buf = new byte[in.readInt()];
+      in.readFully(buf);
+      metadata = new Metadata(buf);
+    }
+
+  }
+}

Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProvider.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProvider.java?rev=1552462&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProvider.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProvider.java Fri Dec 20 00:25:42 2013
@@ -0,0 +1,384 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.crypto.key;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.net.URI;
+import java.util.Date;
+import java.util.List;
+
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * A provider of secret key material for Hadoop applications. Provides an
+ * abstraction to separate key storage from users of encryption. It
+ * is intended to support getting or storing keys in a variety of ways,
+ * including third party bindings.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public abstract class KeyProvider {
+  public static final String DEFAULT_CIPHER_NAME =
+      "hadoop.security.key.default.cipher";
+  public static final String DEFAULT_CIPHER = "AES/CTR/NoPadding";
+  public static final String DEFAULT_BITLENGTH_NAME =
+      "hadoop.security.key.default.bitlength";
+  public static final int DEFAULT_BITLENGTH = 256;
+
+  /**
+   * The combination of both the key version name and the key material.
+   */
+  public static class KeyVersion {
+    private final String versionName;
+    private final byte[] material;
+
+    protected KeyVersion(String versionName,
+                         byte[] material) {
+      this.versionName = versionName;
+      this.material = material;
+    }
+
+    public String getVersionName() {
+      return versionName;
+    }
+
+    public byte[] getMaterial() {
+      return material;
+    }
+
+    public String toString() {
+      StringBuilder buf = new StringBuilder();
+      buf.append("key(");
+      buf.append(versionName);
+      buf.append(")=");
+      if (material == null) {
+        buf.append("null");
+      } else {
+        for(byte b: material) {
+          buf.append(' ');
+          int right = b & 0xff;
+          if (right < 0x10) {
+            buf.append('0');
+          }
+          buf.append(Integer.toHexString(right));
+        }
+      }
+      return buf.toString();
+    }
+  }
+
+  /**
+   * Key metadata that is associated with the key.
+   */
+  public static class Metadata {
+    private final static String CIPHER_FIELD = "cipher";
+    private final static String BIT_LENGTH_FIELD = "bitLength";
+    private final static String CREATED_FIELD = "created";
+    private final static String VERSIONS_FIELD = "versions";
+
+    private final String cipher;
+    private final int bitLength;
+    private final Date created;
+    private int versions;
+
+    protected Metadata(String cipher, int bitLength,
+                       Date created, int versions) {
+      this.cipher = cipher;
+      this.bitLength = bitLength;
+      this.created = created;
+      this.versions = versions;
+    }
+
+    public Date getCreated() {
+      return created;
+    }
+
+    public String getCipher() {
+      return cipher;
+    }
+
+    /**
+     * Get the algorithm from the cipher.
+     * @return the algorithm name
+     */
+    public String getAlgorithm() {
+      int slash = cipher.indexOf('/');
+      if (slash == - 1) {
+        return cipher;
+      } else {
+        return cipher.substring(0, slash);
+      }
+    }
+
+    public int getBitLength() {
+      return bitLength;
+    }
+
+    public int getVersions() {
+      return versions;
+    }
+
+    protected int addVersion() {
+      return versions++;
+    }
+
+    /**
+     * Serialize the metadata to a set of bytes.
+     * @return the serialized bytes
+     * @throws IOException
+     */
+    protected byte[] serialize() throws IOException {
+      ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+      JsonWriter writer = new JsonWriter(new OutputStreamWriter(buffer));
+      writer.beginObject();
+      if (cipher != null) {
+        writer.name(CIPHER_FIELD).value(cipher);
+      }
+      if (bitLength != 0) {
+        writer.name(BIT_LENGTH_FIELD).value(bitLength);
+      }
+      if (created != null) {
+        writer.name(CREATED_FIELD).value(created.getTime());
+      }
+      writer.name(VERSIONS_FIELD).value(versions);
+      writer.endObject();
+      writer.flush();
+      return buffer.toByteArray();
+    }
+
+    /**
+     * Deserialize a new metadata object from a set of bytes.
+     * @param bytes the serialized metadata
+     * @throws IOException
+     */
+    protected Metadata(byte[] bytes) throws IOException {
+      String cipher = null;
+      int bitLength = 0;
+      Date created = null;
+      int versions = 0;
+      JsonReader reader = new JsonReader(new InputStreamReader
+          (new ByteArrayInputStream(bytes)));
+      reader.beginObject();
+      while (reader.hasNext()) {
+        String field = reader.nextName();
+        if (CIPHER_FIELD.equals(field)) {
+          cipher = reader.nextString();
+        } else if (BIT_LENGTH_FIELD.equals(field)) {
+          bitLength = reader.nextInt();
+        } else if (CREATED_FIELD.equals(field)) {
+          created = new Date(reader.nextLong());
+        } else if (VERSIONS_FIELD.equals(field)) {
+          versions = reader.nextInt();
+        }
+      }
+      reader.endObject();
+      this.cipher = cipher;
+      this.bitLength = bitLength;
+      this.created = created;
+      this.versions = versions;
+    }
+  }
+
+  /**
+   * Options when creating key objects.
+   */
+  public static class Options {
+    private String cipher;
+    private int bitLength;
+
+    public Options(Configuration conf) {
+      cipher = conf.get(DEFAULT_CIPHER_NAME, DEFAULT_CIPHER);
+      bitLength = conf.getInt(DEFAULT_BITLENGTH_NAME, DEFAULT_BITLENGTH);
+    }
+
+    public Options setCipher(String cipher) {
+      this.cipher = cipher;
+      return this;
+    }
+
+    public Options setBitLength(int bitLength) {
+      this.bitLength = bitLength;
+      return this;
+    }
+
+    protected String getCipher() {
+      return cipher;
+    }
+
+    protected int getBitLength() {
+      return bitLength;
+    }
+  }
+
+  /**
+   * A helper function to create an options object.
+   * @param conf the configuration to use
+   * @return a new options object
+   */
+  public static Options options(Configuration conf) {
+    return new Options(conf);
+  }
+
+  /**
+   * Get the key material for a specific version of the key. This method is used
+   * when decrypting data.
+   * @param versionName the name of a specific version of the key
+   * @return the key material
+   * @throws IOException
+   */
+  public abstract KeyVersion getKeyVersion(String versionName
+                                            ) throws IOException;
+
+  /**
+   * Get the current version of the key, which should be used for encrypting new
+   * data.
+   * @param name the base name of the key
+   * @return the version name of the current version of the key or null if the
+   *    key version doesn't exist
+   * @throws IOException
+   */
+  public KeyVersion getCurrentKey(String name) throws IOException {
+    Metadata meta = getMetadata(name);
+    if (meta == null) {
+      return null;
+    }
+    return getKeyVersion(buildVersionName(name, meta.getVersions() - 1));
+  }
+
+  /**
+   * Get metadata about the key.
+   * @param name the basename of the key
+   * @return the key's metadata or null if the key doesn't exist
+   * @throws IOException
+   */
+  public abstract Metadata getMetadata(String name) throws IOException;
+
+  /**
+   * Create a new key. The given key must not already exist.
+   * @param name the base name of the key
+   * @param material the key material for the first version of the key.
+   * @param options the options for the new key.
+   * @return the version name of the first version of the key.
+   * @throws IOException
+   */
+  public abstract KeyVersion createKey(String name, byte[] material,
+                                       Options options) throws IOException;
+
+  /**
+   * Delete the given key.
+   * @param name the name of the key to delete
+   * @throws IOException
+   */
+  public abstract void deleteKey(String name) throws IOException;
+
+  /**
+   * Roll a new version of the given key.
+   * @param name the basename of the key
+   * @param material the new key material
+   * @return the name of the new version of the key
+   * @throws IOException
+   */
+  public abstract KeyVersion rollNewVersion(String name,
+                                             byte[] material
+                                            ) throws IOException;
+
+  /**
+   * Ensures that any changes to the keys are written to persistent store.
+   * @throws IOException
+   */
+  public abstract void flush() throws IOException;
+
+  /**
+   * Split the versionName in to a base name. Converts "/aaa/bbb/3" to
+   * "/aaa/bbb".
+   * @param versionName the version name to split
+   * @return the base name of the key
+   * @throws IOException
+   */
+  public static String getBaseName(String versionName) throws IOException {
+    int div = versionName.lastIndexOf('@');
+    if (div == -1) {
+      throw new IOException("No version in key path " + versionName);
+    }
+    return versionName.substring(0, div);
+  }
+
+  /**
+   * Build a version string from a basename and version number. Converts
+   * "/aaa/bbb" and 3 to "/aaa/bbb@3".
+   * @param name the basename of the key
+   * @param version the version of the key
+   * @return the versionName of the key.
+   */
+  protected static String buildVersionName(String name, int version) {
+    return name + "@" + version;
+  }
+
+  /**
+   * Convert a nested URI to decode the underlying path. The translation takes
+   * the authority and parses it into the underlying scheme and authority.
+   * For example, "myscheme://hdfs@nn/my/path" is converted to
+   * "hdfs://nn/my/path".
+   * @param nestedUri the URI from the nested URI
+   * @return the unnested path
+   */
+  public static Path unnestUri(URI nestedUri) {
+    String[] parts = nestedUri.getAuthority().split("@", 2);
+    StringBuilder result = new StringBuilder(parts[0]);
+    result.append("://");
+    if (parts.length == 2) {
+      result.append(parts[1]);
+    }
+    result.append(nestedUri.getPath());
+    if (nestedUri.getQuery() != null) {
+      result.append("?");
+      result.append(nestedUri.getQuery());
+    }
+    if (nestedUri.getFragment() != null) {
+      result.append("#");
+      result.append(nestedUri.getFragment());
+    }
+    return new Path(result.toString());
+  }
+
+  /**
+   * Find the provider with the given key.
+   * @param providerList the list of providers
+   * @param keyName the key name we are looking for
+   * @return the KeyProvider that has the key
+   */
+  public static KeyProvider findProvider(List<KeyProvider> providerList,
+                                         String keyName) throws IOException {
+    for(KeyProvider provider: providerList) {
+      if (provider.getMetadata(keyName) != null) {
+        return provider;
+      }
+    }
+    throw new IOException("Can't find KeyProvider for key " + keyName);
+  }
+}

Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderFactory.java?rev=1552462&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderFactory.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderFactory.java Fri Dec 20 00:25:42 2013
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.crypto.key;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ServiceLoader;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A factory to create a list of KeyProvider based on the path given in a
+ * Configuration. It uses a service loader interface to find the available
+ * KeyProviders and create them based on the list of URIs.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public abstract class KeyProviderFactory {
+  public static final String KEY_PROVIDER_PATH =
+      "hadoop.security.key.provider.path";
+
+  public abstract KeyProvider createProvider(URI providerName,
+                                             Configuration conf
+                                             ) throws IOException;
+
+  private static final ServiceLoader<KeyProviderFactory> serviceLoader =
+      ServiceLoader.load(KeyProviderFactory.class);
+
+  public static List<KeyProvider> getProviders(Configuration conf
+                                               ) throws IOException {
+    List<KeyProvider> result = new ArrayList<KeyProvider>();
+    for(String path: conf.getStringCollection(KEY_PROVIDER_PATH)) {
+      try {
+        URI uri = new URI(path);
+        boolean found = false;
+        for(KeyProviderFactory factory: serviceLoader) {
+          KeyProvider kp = factory.createProvider(uri, conf);
+          if (kp != null) {
+            result.add(kp);
+            found = true;
+            break;
+          }
+        }
+        if (!found) {
+          throw new IOException("No KeyProviderFactory for " + uri + " in " +
+              KEY_PROVIDER_PATH);
+        }
+      } catch (URISyntaxException error) {
+        throw new IOException("Bad configuration of " + KEY_PROVIDER_PATH +
+            " at " + path, error);
+      }
+    }
+    return result;
+  }
+}

Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/UserProvider.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/UserProvider.java?rev=1552462&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/UserProvider.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/UserProvider.java Fri Dec 20 00:25:42 2013
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.crypto.key;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * A KeyProvider factory for UGIs. It uses the credentials object associated
+ * with the current user to find keys. This provider is created using a
+ * URI of "user:///".
+ */
+@InterfaceAudience.Private
+public class UserProvider extends KeyProvider {
+  public static final String SCHEME_NAME = "user";
+  private final UserGroupInformation user;
+  private final Credentials credentials;
+  private final Map<String, Metadata> cache = new HashMap<String, Metadata>();
+
+  private UserProvider() throws IOException {
+    user = UserGroupInformation.getCurrentUser();
+    credentials = user.getCredentials();
+  }
+
+  @Override
+  public KeyVersion getKeyVersion(String versionName) {
+    byte[] bytes = credentials.getSecretKey(new Text(versionName));
+    if (bytes == null) {
+      return null;
+    }
+    return new KeyVersion(versionName, bytes);
+  }
+
+  @Override
+  public Metadata getMetadata(String name) throws IOException {
+    if (cache.containsKey(name)) {
+      return cache.get(name);
+    }
+    byte[] serialized = credentials.getSecretKey(new Text(name));
+    if (serialized == null) {
+      return null;
+    }
+    Metadata result = new Metadata(serialized);
+    cache.put(name, result);
+    return result;
+  }
+
+  @Override
+  public KeyVersion createKey(String name, byte[] material,
+                               Options options) throws IOException {
+    Text nameT = new Text(name);
+    if (credentials.getSecretKey(nameT) != null) {
+      throw new IOException("Key " + name + " already exists in " + this);
+    }
+    if (options.getBitLength() != 8 * material.length) {
+      throw new IOException("Wrong key length. Required " +
+          options.getBitLength() + ", but got " + (8 * material.length));
+    }
+    Metadata meta = new Metadata(options.getCipher(), options.getBitLength(),
+        new Date(), 1);
+    cache.put(name, meta);
+    String versionName = buildVersionName(name, 0);
+    credentials.addSecretKey(nameT, meta.serialize());
+    credentials.addSecretKey(new Text(versionName), material);
+    return new KeyVersion(versionName, material);
+  }
+
+  @Override
+  public void deleteKey(String name) throws IOException {
+    Metadata meta = getMetadata(name);
+    if (meta == null) {
+      throw new IOException("Key " + name + " does not exist in " + this);
+    }
+    for(int v=0; v < meta.getVersions(); ++v) {
+      credentials.removeSecretKey(new Text(buildVersionName(name, v)));
+    }
+    credentials.removeSecretKey(new Text(name));
+    cache.remove(name);
+  }
+
+  @Override
+  public KeyVersion rollNewVersion(String name,
+                                    byte[] material) throws IOException {
+    Metadata meta = getMetadata(name);
+    if (meta == null) {
+      throw new IOException("Key " + name + " not found");
+    }
+    if (meta.getBitLength() != 8 * material.length) {
+      throw new IOException("Wrong key length. Required " +
+          meta.getBitLength() + ", but got " + (8 * material.length));
+    }
+    int nextVersion = meta.addVersion();
+    credentials.addSecretKey(new Text(name), meta.serialize());
+    String versionName = buildVersionName(name, nextVersion);
+    credentials.addSecretKey(new Text(versionName), material);
+    return new KeyVersion(versionName, material);
+  }
+
+  @Override
+  public String toString() {
+    return SCHEME_NAME + ":///";
+  }
+
+  @Override
+  public void flush() {
+    user.addCredentials(credentials);
+  }
+
+  public static class Factory extends KeyProviderFactory {
+
+    @Override
+    public KeyProvider createProvider(URI providerName,
+                                      Configuration conf) throws IOException {
+      if (SCHEME_NAME.equals(providerName.getScheme())) {
+        return new UserProvider();
+      }
+      return null;
+    }
+  }
+}

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/Credentials.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/Credentials.java?rev=1552462&r1=1552461&r2=1552462&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/Credentials.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/Credentials.java Fri Dec 20 00:25:42 2013
@@ -133,7 +133,15 @@ public class Credentials implements Writ
   public void addSecretKey(Text alias, byte[] key) {
     secretKeysMap.put(alias, key);
   }
- 
+
+  /**
+   * Remove the key for a given alias.
+   * @param alias the alias for the key
+   */
+  public void removeSecretKey(Text alias) {
+    secretKeysMap.remove(alias);
+  }
+
   /**
    * Convenience method for reading a token storage file, and loading the Tokens
    * therein in the passed UGI

Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.crypto.key.KeyProviderFactory
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.crypto.key.KeyProviderFactory?rev=1552462&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.crypto.key.KeyProviderFactory (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.crypto.key.KeyProviderFactory Fri Dec 20 00:25:42 2013
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.hadoop.crypto.key.JavaKeyStoreProvider$Factory
+org.apache.hadoop.crypto.key.UserProvider$Factory

Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestKeyProvider.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestKeyProvider.java?rev=1552462&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestKeyProvider.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestKeyProvider.java Fri Dec 20 00:25:42 2013
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto.key;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertArrayEquals;
+
+public class TestKeyProvider {
+
+  @Test
+  public void testBuildVersionName() throws Exception {
+    assertEquals("/a/b@3", KeyProvider.buildVersionName("/a/b", 3));
+    assertEquals("/aaa@12", KeyProvider.buildVersionName("/aaa", 12));
+  }
+
+  @Test
+  public void testParseVersionName() throws Exception {
+    assertEquals("/a/b", KeyProvider.getBaseName("/a/b@3"));
+    assertEquals("/aaa", KeyProvider.getBaseName("/aaa@112"));
+    try {
+      KeyProvider.getBaseName("no-slashes");
+      assertTrue("should have thrown", false);
+    } catch (IOException e) {
+      assertTrue(true);
+    }
+  }
+
+  @Test
+  public void testKeyMaterial() throws Exception {
+    byte[] key1 = new byte[]{1,2,3,4};
+    KeyProvider.KeyVersion obj = new KeyProvider.KeyVersion("key1@1", key1);
+    assertEquals("key1@1", obj.getVersionName());
+    assertArrayEquals(new byte[]{1,2,3,4}, obj.getMaterial());
+  }
+
+  @Test
+  public void testMetadata() throws Exception {
+    DateFormat format = new SimpleDateFormat("y/m/d");
+    Date date = format.parse("2013/12/25");
+    KeyProvider.Metadata meta = new KeyProvider.Metadata("myCipher", 100,
+        date, 123);
+    assertEquals("myCipher", meta.getCipher());
+    assertEquals(100, meta.getBitLength());
+    assertEquals(date, meta.getCreated());
+    assertEquals(123, meta.getVersions());
+    KeyProvider.Metadata second = new KeyProvider.Metadata(meta.serialize());
+    assertEquals(meta.getCipher(), second.getCipher());
+    assertEquals(meta.getBitLength(), second.getBitLength());
+    assertEquals(meta.getCreated(), second.getCreated());
+    assertEquals(meta.getVersions(), second.getVersions());
+    int newVersion = second.addVersion();
+    assertEquals(123, newVersion);
+    assertEquals(124, second.getVersions());
+    assertEquals(123, meta.getVersions());
+  }
+
+  @Test
+  public void testOptions() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(KeyProvider.DEFAULT_CIPHER_NAME, "myCipher");
+    conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 512);
+    KeyProvider.Options options = KeyProvider.options(conf);
+    assertEquals("myCipher", options.getCipher());
+    assertEquals(512, options.getBitLength());
+    options.setCipher("yourCipher");
+    options.setBitLength(128);
+    assertEquals("yourCipher", options.getCipher());
+    assertEquals(128, options.getBitLength());
+    options = KeyProvider.options(new Configuration());
+    assertEquals(KeyProvider.DEFAULT_CIPHER, options.getCipher());
+    assertEquals(KeyProvider.DEFAULT_BITLENGTH, options.getBitLength());
+  }
+
+  @Test
+  public void testUnnestUri() throws Exception {
+    assertEquals(new Path("hdfs://nn.example.com/my/path"),
+        KeyProvider.unnestUri(new URI("myscheme://hdfs@nn.example.com/my/path")));
+    assertEquals(new Path("hdfs://nn/my/path?foo=bar&baz=bat#yyy"),
+        KeyProvider.unnestUri(new URI("myscheme://hdfs@nn/my/path?foo=bar&baz=bat#yyy")));
+    assertEquals(new Path("inner://hdfs@nn1.example.com/my/path"),
+        KeyProvider.unnestUri(new URI("outer://inner@hdfs@nn1.example.com/my/path")));
+    assertEquals(new Path("user:///"),
+        KeyProvider.unnestUri(new URI("outer://user/")));
+  }
+}

Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestKeyProviderFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestKeyProviderFactory.java?rev=1552462&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestKeyProviderFactory.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestKeyProviderFactory.java Fri Dec 20 00:25:42 2013
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto.key;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestKeyProviderFactory {
+
+  private static final File tmpDir =
+      new File(System.getProperty("test.build.data", "/tmp"), "key");
+
+  @Test
+  public void testFactory() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(KeyProviderFactory.KEY_PROVIDER_PATH,
+        UserProvider.SCHEME_NAME + ":///," +
+            JavaKeyStoreProvider.SCHEME_NAME + "://file" + tmpDir + "/test.jks");
+    List<KeyProvider> providers = KeyProviderFactory.getProviders(conf);
+    assertEquals(2, providers.size());
+    assertEquals(UserProvider.class, providers.get(0).getClass());
+    assertEquals(JavaKeyStoreProvider.class, providers.get(1).getClass());
+    assertEquals(UserProvider.SCHEME_NAME +
+        ":///", providers.get(0).toString());
+    assertEquals(JavaKeyStoreProvider.SCHEME_NAME +
+        "://file" + tmpDir + "/test.jks",
+        providers.get(1).toString());
+  }
+
+  @Test
+  public void testFactoryErrors() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(KeyProviderFactory.KEY_PROVIDER_PATH, "unknown:///");
+    try {
+      List<KeyProvider> providers = KeyProviderFactory.getProviders(conf);
+      assertTrue("should throw!", false);
+    } catch (IOException e) {
+      assertEquals("No KeyProviderFactory for unknown:/// in " +
+          KeyProviderFactory.KEY_PROVIDER_PATH,
+          e.getMessage());
+    }
+  }
+
+  @Test
+  public void testUriErrors() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(KeyProviderFactory.KEY_PROVIDER_PATH, "unkn@own:/x/y");
+    try {
+      List<KeyProvider> providers = KeyProviderFactory.getProviders(conf);
+      assertTrue("should throw!", false);
+    } catch (IOException e) {
+      assertEquals("Bad configuration of " +
+          KeyProviderFactory.KEY_PROVIDER_PATH +
+          " at unkn@own:/x/y", e.getMessage());
+    }
+  }
+
+  static void checkSpecificProvider(Configuration conf,
+                                   String ourUrl) throws Exception {
+    KeyProvider provider = KeyProviderFactory.getProviders(conf).get(0);
+    byte[] key1 = new byte[32];
+    byte[] key2 = new byte[32];
+    byte[] key3 = new byte[32];
+    for(int i =0; i < key1.length; ++i) {
+      key1[i] = (byte) i;
+      key2[i] = (byte) (i * 2);
+      key3[i] = (byte) (i * 3);
+    }
+    // ensure that we get nulls when the key isn't there
+    assertEquals(null, provider.getKeyVersion("no-such-key"));
+    assertEquals(null, provider.getMetadata("key"));
+    // create a new key
+    try {
+      provider.createKey("key3", key3, KeyProvider.options(conf));
+    } catch (Exception e) {
+      e.printStackTrace();
+      throw e;
+    }
+    // check the metadata for key3
+    KeyProvider.Metadata meta = provider.getMetadata("key3");
+    assertEquals(KeyProvider.DEFAULT_CIPHER, meta.getCipher());
+    assertEquals(KeyProvider.DEFAULT_BITLENGTH, meta.getBitLength());
+    assertEquals(1, meta.getVersions());
+    // make sure we get back the right key
+    assertArrayEquals(key3, provider.getCurrentKey("key3").getMaterial());
+    assertEquals("key3@0", provider.getCurrentKey("key3").getVersionName());
+    // try recreating key3
+    try {
+      provider.createKey("key3", key3, KeyProvider.options(conf));
+      assertTrue("should throw", false);
+    } catch (IOException e) {
+      assertEquals("Key key3 already exists in " + ourUrl, e.getMessage());
+    }
+    provider.deleteKey("key3");
+    try {
+      provider.deleteKey("key3");
+      assertTrue("should throw", false);
+    } catch (IOException e) {
+      assertEquals("Key key3 does not exist in " + ourUrl, e.getMessage());
+    }
+    provider.createKey("key3", key3, KeyProvider.options(conf));
+    try {
+      provider.createKey("key4", key3,
+          KeyProvider.options(conf).setBitLength(8));
+      assertTrue("should throw", false);
+    } catch (IOException e) {
+      assertEquals("Wrong key length. Required 8, but got 256", e.getMessage());
+    }
+    provider.createKey("key4", new byte[]{1},
+        KeyProvider.options(conf).setBitLength(8));
+    provider.rollNewVersion("key4", new byte[]{2});
+    meta = provider.getMetadata("key4");
+    assertEquals(2, meta.getVersions());
+    assertArrayEquals(new byte[]{2},
+        provider.getCurrentKey("key4").getMaterial());
+    assertArrayEquals(new byte[]{1},
+        provider.getKeyVersion("key4@0").getMaterial());
+    assertEquals("key4@1", provider.getCurrentKey("key4").getVersionName());
+    try {
+      provider.rollNewVersion("key4", key1);
+      assertTrue("should throw", false);
+    } catch (IOException e) {
+      assertEquals("Wrong key length. Required 8, but got 256", e.getMessage());
+    }
+    try {
+      provider.rollNewVersion("no-such-key", key1);
+      assertTrue("should throw", false);
+    } catch (IOException e) {
+      assertEquals("Key no-such-key not found", e.getMessage());
+    }
+    provider.flush();
+    // get a new instance of the provider to ensure it was saved correctly
+    provider = KeyProviderFactory.getProviders(conf).get(0);
+    assertArrayEquals(new byte[]{2},
+        provider.getCurrentKey("key4").getMaterial());
+    assertArrayEquals(key3, provider.getCurrentKey("key3").getMaterial());
+    assertEquals("key3@0", provider.getCurrentKey("key3").getVersionName());
+  }
+
+  @Test
+  public void testUserProvider() throws Exception {
+    Configuration conf = new Configuration();
+    final String ourUrl = UserProvider.SCHEME_NAME + ":///";
+    conf.set(KeyProviderFactory.KEY_PROVIDER_PATH, ourUrl);
+    checkSpecificProvider(conf, ourUrl);
+    // see if the credentials are actually in the UGI
+    Credentials credentials =
+        UserGroupInformation.getCurrentUser().getCredentials();
+    assertArrayEquals(new byte[]{1},
+        credentials.getSecretKey(new Text("key4@0")));
+    assertArrayEquals(new byte[]{2},
+        credentials.getSecretKey(new Text("key4@1")));
+  }
+
+  @Test
+  public void testJksProvider() throws Exception {
+    Configuration conf = new Configuration();
+    final String ourUrl =
+        JavaKeyStoreProvider.SCHEME_NAME + "://file" + tmpDir + "/test.jks";
+    File file = new File(tmpDir, "test.jks");
+    file.delete();
+    conf.set(KeyProviderFactory.KEY_PROVIDER_PATH, ourUrl);
+    checkSpecificProvider(conf, ourUrl);
+    assertTrue(file + " should exist", file.isFile());
+  }
+}