You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2018/08/13 20:00:50 UTC

[accumulo] branch master updated: Provide new Crypto interface & impl (#560)

This is an automated email from the ASF dual-hosted git repository.

mmiller pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e14af6  Provide new Crypto interface & impl (#560)
5e14af6 is described below

commit 5e14af6c90be50db0a4c92c855bb002f95284931
Author: Mike Miller <mm...@apache.org>
AuthorDate: Mon Aug 13 16:00:48 2018 -0400

    Provide new Crypto interface & impl (#560)
    
    * Replaced old CryptoModule properties and code with new CryptoService
    * CryptoServiceFactory initializes a single CryptoService per tablet server only once
    * CryptoService then creates a FileEncrypter or FileDecrypter per file
    * Made new crypto properties operate at the instance level
    * Modified Initialize to use site config
    * Changed BCFile and DFSLogger to read crypto params
    * Added setProperty to Mini for testing
    * Implement AES/GCM crypto in new AESCryptoService
    * Added a custom CryptoException
    * Implement parameters parser for crypto service
    * Up size & time thresholds slightly for VolumeIT
    
    Co-authored-by: Nick Felts <31...@users.noreply.github.com>
---
 .../accumulo/core/client/rfile/RFileScanner.java   |   9 +-
 .../core/client/rfile/RFileSummariesRetriever.java |   4 +-
 .../accumulo/core/conf/ConfigSanityCheck.java      |  50 +-
 .../org/apache/accumulo/core/conf/Property.java    |  86 +--
 .../apache/accumulo/core/file/FileOperations.java  |  16 +
 .../file/blockfile/impl/CachableBlockFile.java     |  37 +-
 .../apache/accumulo/core/file/rfile/PrintInfo.java |   5 +-
 .../accumulo/core/file/rfile/RFileOperations.java  |   7 +-
 .../accumulo/core/file/rfile/SplitLarge.java       |  17 +-
 .../accumulo/core/file/rfile/bcfile/BCFile.java    | 347 ++-------
 .../accumulo/core/file/rfile/bcfile/PrintInfo.java |   7 +-
 .../CachingHDFSSecretKeyEncryptionStrategy.java    | 234 ------
 .../core/security/crypto/CryptoModule.java         | 121 ----
 .../core/security/crypto/CryptoModuleFactory.java  | 245 -------
 .../security/crypto/CryptoModuleParameters.java    | 725 -------------------
 .../core/security/crypto/CryptoServiceFactory.java |  74 ++
 ...aultCryptoModuleUtils.java => CryptoUtils.java} |  45 +-
 .../core/security/crypto/DefaultCryptoModule.java  | 488 -------------
 .../NonCachingSecretKeyEncryptionStrategy.java     | 204 ------
 .../crypto/SecretKeyEncryptionStrategy.java        |  27 -
 .../security/crypto/impl/AESCryptoService.java     | 518 +++++++++++++
 .../crypto/impl/CryptoEnvironmentImpl.java         |  42 ++
 .../core/security/crypto/impl/KeyManager.java      |  87 +++
 .../core/security/crypto/impl/NoCryptoService.java |  46 ++
 .../core/security/crypto/impl/NoFileDecrypter.java |  29 +
 .../core/security/crypto/impl/NoFileEncrypter.java |  37 +
 .../crypto/{ => streams}/BlockedInputStream.java   |   2 +-
 .../crypto/{ => streams}/BlockedOutputStream.java  |   4 +-
 .../{ => streams}/DiscardCloseOutputStream.java    |   2 +-
 .../crypto/{ => streams}/NoFlushOutputStream.java  |   2 +-
 .../{ => streams}/RFileCipherOutputStream.java     |   2 +-
 .../core/spi/crypto/CryptoEnvironment.java         |  35 +
 .../accumulo/core/spi/crypto/CryptoService.java    |  70 ++
 .../accumulo/core/spi/crypto/FileDecrypter.java    |  31 +
 .../accumulo/core/spi/crypto/FileEncrypter.java    |  44 ++
 .../org/apache/accumulo/core/summary/Gatherer.java |   7 +-
 .../accumulo/core/summary/SummaryReader.java       |  11 +-
 .../accumulo/core/conf/ConfigSanityCheckTest.java  | 103 +--
 .../core/file/rfile/CreateCompatTestFile.java      |   7 +-
 .../core/file/rfile/MultiLevelIndexTest.java       |   5 +-
 .../core/file/rfile/MultiThreadedRFileTest.java    |   7 +-
 .../apache/accumulo/core/file/rfile/RFileTest.java | 110 ++-
 .../core/security/crypto/BlockedIOStreamTest.java  |   2 +
 .../accumulo/core/security/crypto/CryptoTest.java  | 800 +++++++--------------
 .../src/test/resources/crypto-on-accumulo-site.xml |  91 +--
 .../crypto-on-no-key-encryption-accumulo-site.xml  | 111 ---
 .../minicluster/impl/MiniAccumuloConfigImpl.java   |   9 +
 .../apache/accumulo/server/init/Initialize.java    |  12 +-
 .../tserver/compaction/MajorCompactionRequest.java |   6 +-
 .../org/apache/accumulo/tserver/log/DfsLogger.java | 164 ++---
 .../test/MissingWalHeaderCompletesRecoveryIT.java  |   4 +-
 .../org/apache/accumulo/test/ShellConfigIT.java    |   5 +-
 .../java/org/apache/accumulo/test/VolumeIT.java    |   4 +-
 .../test/functional/WriteAheadLogEncryptedIT.java  | 104 +++
 .../UnusedWalDoesntCloseReplicationStatusIT.java   |   2 +-
 55 files changed, 1811 insertions(+), 3452 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
index bece0a1..15b7543 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
@@ -57,10 +57,12 @@ import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iterators.system.MultiIterator;
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
 import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.crypto.CryptoServiceFactory;
 import org.apache.accumulo.core.spi.cache.BlockCache;
 import org.apache.accumulo.core.spi.cache.BlockCacheManager;
 import org.apache.accumulo.core.spi.cache.CacheEntry;
 import org.apache.accumulo.core.spi.cache.CacheType;
+import org.apache.accumulo.core.spi.crypto.CryptoService;
 import org.apache.accumulo.core.util.LocalityGroupUtil;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.io.Text;
@@ -80,6 +82,7 @@ class RFileScanner extends ScannerOptions implements Scanner {
   private int batchSize = 1000;
   private long readaheadThreshold = 3;
   private AccumuloConfiguration tableConf;
+  private CryptoService cryptoService;
 
   static class Opts {
     InputArgs in;
@@ -215,6 +218,7 @@ class RFileScanner extends ScannerOptions implements Scanner {
     if (null == this.dataCache) {
       this.dataCache = new NoopCache();
     }
+    this.cryptoService = CryptoServiceFactory.getConfigured(tableConf);
   }
 
   @Override
@@ -353,8 +357,9 @@ class RFileScanner extends ScannerOptions implements Scanner {
       for (int i = 0; i < sources.length; i++) {
         // TODO may have been a bug with multiple files and caching in older version...
         FSDataInputStream inputStream = (FSDataInputStream) sources[i].getInputStream();
-        readers.add(new RFile.Reader(new CachableBlockFile.Reader("source-" + i, inputStream,
-            sources[i].getLength(), opts.in.getConf(), dataCache, indexCache, tableConf)));
+        readers.add(new RFile.Reader(
+            new CachableBlockFile.Reader("source-" + i, inputStream, sources[i].getLength(),
+                opts.in.getConf(), dataCache, indexCache, tableConf, cryptoService)));
       }
 
       if (getSamplerConfiguration() != null) {
diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSummariesRetriever.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSummariesRetriever.java
index 7fd618c..7ae9b7c 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSummariesRetriever.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSummariesRetriever.java
@@ -31,6 +31,7 @@ import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
 import org.apache.accumulo.core.client.summary.Summary;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.security.crypto.CryptoServiceFactory;
 import org.apache.accumulo.core.summary.Gatherer;
 import org.apache.accumulo.core.summary.SummarizerFactory;
 import org.apache.accumulo.core.summary.SummaryCollection;
@@ -88,7 +89,8 @@ class RFileSummariesRetriever implements SummaryInputArguments, SummaryFSOptions
       SummaryCollection all = new SummaryCollection();
       for (RFileSource source : sources) {
         SummaryReader fileSummary = SummaryReader.load(conf, acuconf, source.getInputStream(),
-            source.getLength(), summarySelector, factory);
+            source.getLength(), summarySelector, factory,
+            CryptoServiceFactory.getConfigured(acuconf));
         SummaryCollection sc = fileSummary
             .getSummaries(Collections.singletonList(new Gatherer.RowRange(startRow, endRow)));
         all.merge(sc, factory);
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/ConfigSanityCheck.java b/core/src/main/java/org/apache/accumulo/core/conf/ConfigSanityCheck.java
index 1b778cc..1e0d43b 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/ConfigSanityCheck.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/ConfigSanityCheck.java
@@ -20,8 +20,7 @@ import java.io.IOException;
 import java.util.Map.Entry;
 import java.util.Objects;
 
-import org.apache.accumulo.core.security.crypto.CryptoModule;
-import org.apache.accumulo.core.security.crypto.SecretKeyEncryptionStrategy;
+import org.apache.accumulo.core.spi.crypto.CryptoService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,9 +33,6 @@ public class ConfigSanityCheck {
 
   private static final Logger log = LoggerFactory.getLogger(ConfigSanityCheck.class);
   private static final String PREFIX = "BAD CONFIG ";
-  private static final String NULL_CIPHER = "NullCipher";
-  private static final String NULL_CRYPTO_MODULE = "NullCryptoModule";
-  private static final String NULL_SECRET_KEY_CRYPT_STRATEGY = "NullSecretKeyEncryptionStrategy";
   @SuppressWarnings("deprecation")
   private static final Property INSTANCE_DFS_URI = Property.INSTANCE_DFS_URI;
   @SuppressWarnings("deprecation")
@@ -56,10 +52,6 @@ public class ConfigSanityCheck {
   public static void validate(Iterable<Entry<String,String>> entries) {
     String instanceZkTimeoutValue = null;
     boolean usingVolumes = false;
-    String cipherSuite = NULL_CIPHER;
-    String keyAlgorithm = NULL_CIPHER;
-    String secretKeyEncryptionStrategy = NULL_SECRET_KEY_CRYPT_STRATEGY;
-    String cryptoModule = NULL_CRYPTO_MODULE;
     for (Entry<String,String> entry : entries) {
       String key = entry.getKey();
       String value = entry.getValue();
@@ -91,30 +83,9 @@ public class ConfigSanityCheck {
             + " must be greater than 0 and less than " + Integer.MAX_VALUE + " but was: " + bsize);
       }
 
-      if (key.equals(Property.CRYPTO_CIPHER_SUITE.getKey())) {
-        cipherSuite = Objects.requireNonNull(value);
-        Preconditions.checkArgument(
-            cipherSuite.equals(NULL_CIPHER) || cipherSuite.split("/").length == 3,
-            "Cipher suite must be NullCipher or in the form algorithm/mode/padding. Suite: "
-                + cipherSuite + " is invalid.");
-      }
-
-      if (key.equals(Property.CRYPTO_CIPHER_KEY_ALGORITHM_NAME.getKey())) {
-        keyAlgorithm = Objects.requireNonNull(value);
-      }
-
-      if (key.equals(Property.CRYPTO_MODULE_CLASS.getKey())) {
-        cryptoModule = Objects.requireNonNull(value);
-        if (!cryptoModule.equals(NULL_CRYPTO_MODULE)) {
-          verifyValidClassName(key, cryptoModule, CryptoModule.class);
-        }
-
-      }
-      if (key.equals(Property.CRYPTO_SECRET_KEY_ENCRYPTION_STRATEGY_CLASS.getKey())) {
-        secretKeyEncryptionStrategy = Objects.requireNonNull(value);
-        if (!secretKeyEncryptionStrategy.equals(NULL_SECRET_KEY_CRYPT_STRATEGY)) {
-          verifyValidClassName(key, secretKeyEncryptionStrategy, SecretKeyEncryptionStrategy.class);
-        }
+      if (key.equals(Property.INSTANCE_CRYPTO_SERVICE.getKey())) {
+        String cryptoStrategy = Objects.requireNonNull(value);
+        verifyValidClassName(key, cryptoStrategy, CryptoService.class);
       }
     }
 
@@ -127,19 +98,6 @@ public class ConfigSanityCheck {
       log.warn("Use of {} and {} are deprecated. Consider using {} instead.", INSTANCE_DFS_URI,
           INSTANCE_DFS_DIR, Property.INSTANCE_VOLUMES);
     }
-
-    if ((cipherSuite.equals(NULL_CIPHER) || keyAlgorithm.equals(NULL_CIPHER))
-        && !cipherSuite.equals(keyAlgorithm)) {
-      fatal(Property.CRYPTO_CIPHER_SUITE.getKey() + " and "
-          + Property.CRYPTO_CIPHER_KEY_ALGORITHM_NAME + " must both be configured.");
-    }
-
-    if (cryptoModule.equals(NULL_CRYPTO_MODULE)
-        ^ secretKeyEncryptionStrategy.equals(NULL_SECRET_KEY_CRYPT_STRATEGY)) {
-      fatal(Property.CRYPTO_MODULE_CLASS.getKey() + " and "
-          + Property.CRYPTO_SECRET_KEY_ENCRYPTION_STRATEGY_CLASS.getKey()
-          + " must both be configured.");
-    }
   }
 
   private interface CheckTimeDuration {
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index caa638c..dd76695 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -45,76 +45,6 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Preconditions;
 
 public enum Property {
-  // Crypto-related properties
-  @Experimental
-  CRYPTO_PREFIX("crypto.", null, PropertyType.PREFIX,
-      "Properties in this category related to the configuration of both default and custom crypto"
-          + " modules."),
-  @Experimental
-  CRYPTO_MODULE_CLASS("crypto.module.class", "NullCryptoModule", PropertyType.STRING,
-      "Fully qualified class name of the class that implements the CryptoModule"
-          + " interface, to be used in setting up encryption at rest for the WAL and"
-          + " (future) other parts of the code."),
-  @Experimental
-  CRYPTO_CIPHER_SUITE("crypto.cipher.suite", "NullCipher", PropertyType.STRING,
-      "Describes the cipher suite to use for rfile encryption. The value must"
-          + " be either NullCipher or in the form of algorithm/mode/padding, e.g."
-          + " AES/CBC/NoPadding"),
-  @Experimental
-  CRYPTO_WAL_CIPHER_SUITE("crypto.wal.cipher.suite", "", PropertyType.STRING,
-      "Describes the cipher suite to use for the write-ahead log. Defaults to"
-          + " 'cyrpto.cipher.suite' and will use that value for WAL encryption unless"
-          + " otherwise specified. Valid suite values include: an empty string,"
-          + " NullCipher, or a string the form of algorithm/mode/padding, e.g."
-          + " AES/CBC/NOPadding"),
-  @Experimental
-  CRYPTO_CIPHER_KEY_ALGORITHM_NAME("crypto.cipher.key.algorithm.name", "NullCipher",
-      PropertyType.STRING,
-      "States the name of the algorithm used for the key for the corresponding"
-          + " cipher suite. The key type must be compatible with the cipher suite."),
-  @Experimental
-  CRYPTO_BLOCK_STREAM_SIZE("crypto.block.stream.size", "1K", PropertyType.BYTES,
-      "The size of the buffer above the cipher stream. Used for reading files"
-          + " and padding walog entries."),
-  @Experimental
-  CRYPTO_CIPHER_KEY_LENGTH("crypto.cipher.key.length", "128", PropertyType.STRING,
-      "Specifies the key length *in bits* to use for the symmetric key, "
-          + "should probably be 128 or 256 unless you really know what you're doing"),
-  @Experimental
-  CRYPTO_SECURITY_PROVIDER("crypto.security.provider", "", PropertyType.STRING,
-      "States the security provider to use, and defaults to the system configured provider"),
-  @Experimental
-  CRYPTO_SECURE_RNG("crypto.secure.rng", "SHA1PRNG", PropertyType.STRING,
-      "States the secure random number generator to use, and defaults to the built-in SHA1PRNG"),
-  @Experimental
-  CRYPTO_SECURE_RNG_PROVIDER("crypto.secure.rng.provider", "SUN", PropertyType.STRING,
-      "States the secure random number generator provider to use."),
-  @Experimental
-  CRYPTO_SECRET_KEY_ENCRYPTION_STRATEGY_CLASS("crypto.secret.key.encryption.strategy.class",
-      "NullSecretKeyEncryptionStrategy", PropertyType.STRING,
-      "The class Accumulo should use for its key encryption strategy."),
-  @Experimental
-  CRYPTO_DEFAULT_KEY_STRATEGY_KEY_LOCATION("crypto.default.key.strategy.key.location",
-      "/crypto/secret/keyEncryptionKey", PropertyType.ABSOLUTEPATH,
-      "The path relative to the top level instance directory (instance.dfs.dir) where to store"
-          + " the key encryption key within HDFS."),
-  @Experimental
-  CRYPTO_DEFAULT_KEY_STRATEGY_CIPHER_SUITE("crypto.default.key.strategy.cipher.suite", "NullCipher",
-      PropertyType.STRING,
-      "The cipher suite to use when encrypting session keys with a key"
-          + " encryption keyThis should be set to match the overall encryption"
-          + " algorithm but with ECB mode and no padding unless you really know what"
-          + " you're doing and are sure you won't break internal file formats"),
-  @Experimental
-  CRYPTO_OVERRIDE_KEY_STRATEGY_WITH_CONFIGURED_STRATEGY(
-      "crypto.override.key.strategy.with.configured.strategy", "false", PropertyType.BOOLEAN,
-      "The default behavior is to record the key encryption strategy with the"
-          + " encrypted file, and continue to use that strategy for the life of that"
-          + " file. Sometimes, you change your strategy and want to use the new"
-          + " strategy, not the old one. (Most commonly, this will be because you have"
-          + " moved key material from one spot to another.) If you want to override"
-          + " the recorded key strategy with the one in the configuration file, set"
-          + " this property to true."),
   // SSL properties local to each node (see also instance.ssl.enabled which must be consistent
   // across all nodes in an instance)
   RPC_PREFIX("rpc.", null, PropertyType.PREFIX,
@@ -238,6 +168,20 @@ public enum Property {
       PropertyType.STRING,
       "One-line configuration property controlling the network locations "
           + "(hostnames) that are allowed to impersonate other users"),
+  // Crypto-related properties
+  @Experimental
+  INSTANCE_CRYPTO_PREFIX("instance.crypto.opts.", null, PropertyType.PREFIX,
+      "Properties related to on-disk file encryption."),
+  @Experimental
+  @Sensitive
+  INSTANCE_CRYPTO_SENSITIVE_PREFIX("instance.crypto.opts.sensitive.", null, PropertyType.PREFIX,
+      "Sensitive properties related to on-disk file encryption."),
+  @Experimental
+  INSTANCE_CRYPTO_SERVICE("instance.crypto.service",
+      "org.apache.accumulo.core.security.crypto.impl.NoCryptoService", PropertyType.CLASSNAME,
+      "The class which executes on-disk file encryption. The default does nothing. To enable "
+          + "encryption, replace this classname with an implementation of the"
+          + "org.apache.accumulo.core.spi.crypto.CryptoService interface."),
 
   // general properties
   GENERAL_PREFIX("general.", null, PropertyType.PREFIX,
@@ -1197,7 +1141,7 @@ public enum Property {
       Property.TSERV_MAJC_MAXCONCURRENT, Property.REPLICATION_WORKER_THREADS,
       Property.TABLE_DURABILITY, Property.INSTANCE_ZK_TIMEOUT, Property.TABLE_CLASSPATH,
       Property.MASTER_METADATA_SUSPENDABLE, Property.TABLE_FAILURES_IGNORE,
-      Property.TABLE_SCAN_MAXMEM);
+      Property.TABLE_SCAN_MAXMEM, Property.INSTANCE_CRYPTO_SERVICE);
 
   private static final EnumSet<Property> fixedProperties = EnumSet.of(Property.TSERV_CLIENTPORT,
       Property.TSERV_NATIVEMAP_ENABLED, Property.TSERV_SCAN_MAX_OPENFILES,
diff --git a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
index 4fce4a6..caccea7 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
@@ -29,6 +29,7 @@ import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.file.rfile.RFile;
 import org.apache.accumulo.core.spi.cache.BlockCache;
+import org.apache.accumulo.core.spi.crypto.CryptoService;
 import org.apache.accumulo.core.util.ratelimit.RateLimiter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -395,6 +396,7 @@ public abstract class FileOperations {
     private BlockCache dataCache;
     private BlockCache indexCache;
     private Cache<String,Long> fileLenCache;
+    private CryptoService cryptoService;
 
     /**
      * (Optional) Set the block cache pair to be used to optimize reads within the constructed
@@ -429,6 +431,11 @@ public abstract class FileOperations {
       return (SubclassType) this;
     }
 
+    public SubclassType withCryptoService(CryptoService cryptoService) {
+      this.cryptoService = cryptoService;
+      return (SubclassType) this;
+    }
+
     public BlockCache getDataCache() {
       return dataCache;
     }
@@ -440,6 +447,10 @@ public abstract class FileOperations {
     public Cache<String,Long> getFileLenCache() {
       return fileLenCache;
     }
+
+    public CryptoService getCryptoService() {
+      return cryptoService;
+    }
   }
 
   /** Builder interface parallel to {@link FileReaderOperation}. */
@@ -463,6 +474,11 @@ public abstract class FileOperations {
      * (Optional) set the file len cache to be used to optimize reads within the constructed reader.
      */
     SubbuilderType withFileLenCache(Cache<String,Long> fileLenCache);
+
+    /**
+     * (Optional) set the crypto service to be used within the constructed reader.
+     */
+    SubbuilderType withCryptoService(CryptoService cryptoService);
   }
 
   /**
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
index c7533e3..e49a0ce 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
@@ -36,6 +36,7 @@ import org.apache.accumulo.core.spi.cache.BlockCache;
 import org.apache.accumulo.core.spi.cache.BlockCache.Loader;
 import org.apache.accumulo.core.spi.cache.CacheEntry;
 import org.apache.accumulo.core.spi.cache.CacheEntry.Weighable;
+import org.apache.accumulo.core.spi.crypto.CryptoService;
 import org.apache.accumulo.core.util.ratelimit.RateLimiter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -75,6 +76,7 @@ public class CachableBlockFile {
     private boolean closed = false;
     private final Configuration conf;
     private final AccumuloConfiguration accumuloConfiguration;
+    private final CryptoService cryptoService;
 
     private final IoeSupplier<InputStream> inputSupplier;
     private final IoeSupplier<Long> lengthSupplier;
@@ -105,11 +107,12 @@ public class CachableBlockFile {
         BCFile.Reader tmpReader = null;
         if (serializedMetadata == null) {
           if (fileLenCache == null) {
-            tmpReader = new BCFile.Reader(fsIn, lengthSupplier.get(), conf, accumuloConfiguration);
+            tmpReader = new BCFile.Reader(fsIn, lengthSupplier.get(), conf, accumuloConfiguration,
+                cryptoService);
           } else {
             long len = getCachedFileLen();
             try {
-              tmpReader = new BCFile.Reader(fsIn, len, conf, accumuloConfiguration);
+              tmpReader = new BCFile.Reader(fsIn, len, conf, accumuloConfiguration, cryptoService);
             } catch (Exception e) {
               log.debug("Failed to open {}, clearing file length cache and retrying", cacheId, e);
               fileLenCache.invalidate(cacheId);
@@ -117,11 +120,12 @@ public class CachableBlockFile {
 
             if (tmpReader == null) {
               len = getCachedFileLen();
-              tmpReader = new BCFile.Reader(fsIn, len, conf, accumuloConfiguration);
+              tmpReader = new BCFile.Reader(fsIn, len, conf, accumuloConfiguration, cryptoService);
             }
           }
         } else {
-          tmpReader = new BCFile.Reader(serializedMetadata, fsIn, conf);
+          tmpReader = new BCFile.Reader(serializedMetadata, fsIn, conf, accumuloConfiguration,
+              cryptoService);
         }
 
         if (!bcfr.compareAndSet(null, tmpReader)) {
@@ -297,7 +301,7 @@ public class CachableBlockFile {
     private Reader(String cacheId, IoeSupplier<InputStream> inputSupplier,
         IoeSupplier<Long> lenghtSupplier, Cache<String,Long> fileLenCache, BlockCache data,
         BlockCache index, RateLimiter readLimiter, Configuration conf,
-        AccumuloConfiguration accumuloConfiguration) {
+        AccumuloConfiguration accumuloConfiguration, CryptoService cryptoService) {
       Preconditions.checkArgument(cacheId != null || (data == null && index == null));
       this.cacheId = cacheId;
       this.inputSupplier = inputSupplier;
@@ -308,29 +312,36 @@ public class CachableBlockFile {
       this.readLimiter = readLimiter;
       this.conf = conf;
       this.accumuloConfiguration = accumuloConfiguration;
+      this.cryptoService = cryptoService;
     }
 
     public Reader(FileSystem fs, Path dataFile, Configuration conf, BlockCache data,
-        BlockCache index, AccumuloConfiguration accumuloConfiguration) throws IOException {
-      this(fs, dataFile, conf, null, data, index, null, accumuloConfiguration);
+        BlockCache index, AccumuloConfiguration accumuloConfiguration, CryptoService cryptoService)
+        throws IOException {
+      this(fs, dataFile, conf, null, data, index, null, accumuloConfiguration, cryptoService);
     }
 
     public Reader(FileSystem fs, Path dataFile, Configuration conf, Cache<String,Long> fileLenCache,
         BlockCache data, BlockCache index, RateLimiter readLimiter,
-        AccumuloConfiguration accumuloConfiguration) throws IOException {
+        AccumuloConfiguration accumuloConfiguration, CryptoService cryptoService)
+        throws IOException {
       this(dataFile.toString(), () -> fs.open(dataFile), () -> fs.getFileStatus(dataFile).getLen(),
-          fileLenCache, data, index, readLimiter, conf, accumuloConfiguration);
+          fileLenCache, data, index, readLimiter, conf, accumuloConfiguration, cryptoService);
     }
 
     public <InputStreamType extends InputStream & Seekable> Reader(String cacheId,
         InputStreamType fsin, long len, Configuration conf, BlockCache data, BlockCache index,
-        AccumuloConfiguration accumuloConfiguration) throws IOException {
-      this(cacheId, () -> fsin, () -> len, null, data, index, null, conf, accumuloConfiguration);
+        AccumuloConfiguration accumuloConfiguration, CryptoService cryptoService)
+        throws IOException {
+      this(cacheId, () -> fsin, () -> len, null, data, index, null, conf, accumuloConfiguration,
+          cryptoService);
     }
 
     public <InputStreamType extends InputStream & Seekable> Reader(InputStreamType fsin, long len,
-        Configuration conf, AccumuloConfiguration accumuloConfiguration) throws IOException {
-      this(null, () -> fsin, () -> len, null, null, null, null, conf, accumuloConfiguration);
+        Configuration conf, AccumuloConfiguration accumuloConfiguration,
+        CryptoService cryptoService) throws IOException {
+      this(null, () -> fsin, () -> len, null, null, null, null, conf, accumuloConfiguration,
+          cryptoService);
     }
 
     /**
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java
index 53a43c7..f1fe0d8 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.accumulo.core.cli.Help;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.SiteConfiguration;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
@@ -29,6 +30,7 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.FileSKVIterator;
 import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
 import org.apache.accumulo.core.file.rfile.RFile.Reader;
+import org.apache.accumulo.core.security.crypto.CryptoServiceFactory;
 import org.apache.accumulo.core.summary.SummaryReader;
 import org.apache.accumulo.core.util.LocalityGroupUtil;
 import org.apache.accumulo.start.spi.KeywordExecutable;
@@ -163,8 +165,9 @@ public class PrintInfo implements KeywordExecutable {
       System.out
           .println("Reading file: " + path.makeQualified(fs.getUri(), fs.getWorkingDirectory()));
 
+      AccumuloConfiguration aconf = SiteConfiguration.getInstance();
       CachableBlockFile.Reader _rdr = new CachableBlockFile.Reader(fs, path, conf, null, null,
-          SiteConfiguration.getInstance());
+          aconf, CryptoServiceFactory.getConfigured(aconf));
       Reader iter = new RFile.Reader(_rdr);
       MetricsGatherer<Map<String,ArrayList<VisibilityMetric>>> vmg = new VisMetricsGatherer();
 
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
index b4f82a3..fff8243 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
@@ -33,6 +33,8 @@ import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
 import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
 import org.apache.accumulo.core.sample.impl.SamplerFactory;
+import org.apache.accumulo.core.security.crypto.CryptoServiceFactory;
+import org.apache.accumulo.core.spi.crypto.CryptoService;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -48,7 +50,7 @@ public class RFileOperations extends FileOperations {
     CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(options.getFileSystem(),
         new Path(options.getFilename()), options.getConfiguration(), options.getFileLenCache(),
         options.getDataCache(), options.getIndexCache(), options.getRateLimiter(),
-        options.getTableConfiguration());
+        options.getTableConfiguration(), options.getCryptoService());
     return new RFile.Reader(_cbr);
   }
 
@@ -128,9 +130,10 @@ public class RFileOperations extends FileOperations {
 
       outputStream = fs.create(new Path(file), false, bufferSize, (short) rep, block);
     }
+    CryptoService cryptoService = CryptoServiceFactory.getConfigured(acuconf);
 
     BCFile.Writer _cbw = new BCFile.Writer(outputStream, options.getRateLimiter(), compression,
-        conf, acuconf);
+        conf, acuconf, cryptoService);
 
     return new RFile.Writer(_cbw, (int) blockSize, (int) indexBlockSize, samplerConfig, sampler);
   }
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java
index 48f2873..148cf7a 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java
@@ -30,6 +30,8 @@ import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
 import org.apache.accumulo.core.file.rfile.RFile.Reader;
 import org.apache.accumulo.core.file.rfile.RFile.Writer;
 import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
+import org.apache.accumulo.core.security.crypto.CryptoServiceFactory;
+import org.apache.accumulo.core.spi.crypto.CryptoService;
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -58,9 +60,10 @@ public class SplitLarge {
 
     for (String file : opts.files) {
       AccumuloConfiguration aconf = DefaultConfiguration.getInstance();
+      CryptoService cryptoService = CryptoServiceFactory.getConfigured(aconf);
       Path path = new Path(file);
-      CachableBlockFile.Reader rdr = new CachableBlockFile.Reader(fs, path, conf, null, null,
-          aconf);
+      CachableBlockFile.Reader rdr = new CachableBlockFile.Reader(fs, path, conf, null, null, aconf,
+          cryptoService);
       try (Reader iter = new RFile.Reader(rdr)) {
 
         if (!file.endsWith(".rf")) {
@@ -71,12 +74,10 @@ public class SplitLarge {
 
         int blockSize = (int) aconf.getAsBytes(Property.TABLE_FILE_BLOCK_SIZE);
         try (
-            Writer small = new RFile.Writer(
-                new BCFile.Writer(fs.create(new Path(smallName)), null, "gz", conf, aconf),
-                blockSize);
-            Writer large = new RFile.Writer(
-                new BCFile.Writer(fs.create(new Path(largeName)), null, "gz", conf, aconf),
-                blockSize)) {
+            Writer small = new RFile.Writer(new BCFile.Writer(fs.create(new Path(smallName)), null,
+                "gz", conf, aconf, cryptoService), blockSize);
+            Writer large = new RFile.Writer(new BCFile.Writer(fs.create(new Path(largeName)), null,
+                "gz", conf, aconf, cryptoService), blockSize)) {
           small.startDefaultLocalityGroup();
           large.startDefaultLocalityGroup();
 
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
index 9abbef6..0898cfe 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
@@ -17,7 +17,7 @@
 
 package org.apache.accumulo.core.file.rfile.bcfile;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.accumulo.core.security.crypto.impl.CryptoEnvironmentImpl.Scope;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -32,22 +32,23 @@ import java.io.OutputStream;
 import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.TreeMap;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.file.rfile.bcfile.Compression.Algorithm;
 import org.apache.accumulo.core.file.rfile.bcfile.Utils.Version;
 import org.apache.accumulo.core.file.streams.BoundedRangeFileInputStream;
 import org.apache.accumulo.core.file.streams.RateLimitedOutputStream;
 import org.apache.accumulo.core.file.streams.SeekableDataInputStream;
-import org.apache.accumulo.core.security.crypto.CryptoModule;
-import org.apache.accumulo.core.security.crypto.CryptoModuleFactory;
-import org.apache.accumulo.core.security.crypto.CryptoModuleParameters;
-import org.apache.accumulo.core.security.crypto.SecretKeyEncryptionStrategy;
+import org.apache.accumulo.core.security.crypto.CryptoServiceFactory;
+import org.apache.accumulo.core.security.crypto.CryptoUtils;
+import org.apache.accumulo.core.security.crypto.impl.CryptoEnvironmentImpl;
+import org.apache.accumulo.core.security.crypto.impl.NoFileDecrypter;
+import org.apache.accumulo.core.spi.crypto.CryptoEnvironment;
+import org.apache.accumulo.core.spi.crypto.CryptoService;
+import org.apache.accumulo.core.spi.crypto.FileDecrypter;
+import org.apache.accumulo.core.spi.crypto.FileEncrypter;
 import org.apache.accumulo.core.util.ratelimit.RateLimiter;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -66,7 +67,20 @@ import org.apache.hadoop.io.compress.Decompressor;
 public final class BCFile {
   // the current version of BCFile impl, increment them (major or minor) made
   // enough changes
-  static final Version API_VERSION = new Version((short) 2, (short) 0);
+  /**
+   * Simplified encryption interface. Allows more flexible encryption.
+   *
+   * @since 2.0
+   */
+  static final Version API_VERSION_3 = new Version((short) 3, (short) 0);
+  /**
+   * Experimental crypto parameters, not flexible. Do not use.
+   */
+  static final Version API_VERSION_2 = new Version((short) 2, (short) 0);
+  /**
+   * Original BCFile version, prior to encryption. Also, any files before 2.0 that didn't have
+   * encryption were marked with this version.
+   */
   static final Version API_VERSION_1 = new Version((short) 1, (short) 0);
   static final Log LOG = LogFactory.getLog(BCFile.class);
 
@@ -94,9 +108,8 @@ public final class BCFile {
   static public class Writer implements Closeable {
     private final RateLimitedOutputStream out;
     private final Configuration conf;
-    private final CryptoModule cryptoModule;
-    private BCFileCryptoModuleParameters cryptoParams;
-    private SecretKeyEncryptionStrategy secretKeyEncryptionStrategy;
+    private FileEncrypter encrypter;
+    private CryptoEnvironmentImpl cryptoEnvironment;
     // the single meta block containing index of compressed data blocks
     final DataIndex dataIndex;
     // index for meta blocks
@@ -126,15 +139,9 @@ public final class BCFile {
       private final SimpleBufferedOutputStream fsBufferedOutput;
       private OutputStream out;
 
-      /**
-       * @param compressionAlgo
-       *          The compression algorithm to be used to for compression.
-       * @param cryptoModule
-       *          the module to use to obtain cryptographic streams
-       */
       public WBlockState(Algorithm compressionAlgo, RateLimitedOutputStream fsOut,
-          BytesWritable fsOutputBuffer, Configuration conf, CryptoModule cryptoModule,
-          CryptoModuleParameters cryptoParams) throws IOException {
+          BytesWritable fsOutputBuffer, Configuration conf, FileEncrypter encrypter)
+          throws IOException {
         this.compressAlgo = compressionAlgo;
         this.fsOut = fsOut;
         this.posStart = fsOut.position();
@@ -143,50 +150,10 @@ public final class BCFile {
 
         this.fsBufferedOutput = new SimpleBufferedOutputStream(this.fsOut,
             fsOutputBuffer.getBytes());
-
-        // *This* is very important. Without this, when the crypto stream is closed (in order to
-        // flush its last bytes),
-        // the underlying RFile stream will *also* be closed, and that's undesirable as the cipher
-        // stream is closed for
-        // every block written.
-        cryptoParams.setCloseUnderylingStreamAfterCryptoStreamClose(false);
-
-        // *This* is also very important. We don't want the underlying stream messed with.
-        cryptoParams.setRecordParametersToStream(false);
-
-        // Create a new IV for the block or update an existing one in the case of GCM
-        cryptoParams.updateInitializationVector();
-
-        // Initialize the cipher including generating a new IV
-        cryptoParams = cryptoModule.initializeCipher(cryptoParams);
-
-        // Write the init vector in plain text, uncompressed, to the output stream. Due to the way
-        // the streams work out, there's no good way to write this
-        // compressed, but it's pretty small.
-        DataOutputStream tempDataOutputStream = new DataOutputStream(fsBufferedOutput);
-
-        // Init vector might be null if the underlying cipher does not require one (NullCipher being
-        // a good example)
-        if (cryptoParams.getInitializationVector() != null) {
-          tempDataOutputStream.writeInt(cryptoParams.getInitializationVector().length);
-          tempDataOutputStream.write(cryptoParams.getInitializationVector());
-        } else {
-          // Do nothing
-        }
-
-        // Initialize the cipher stream and get the IV
-        cryptoParams.setPlaintextOutputStream(tempDataOutputStream);
-        cryptoParams = cryptoModule.getEncryptingOutputStream(cryptoParams);
-
-        if (cryptoParams.getEncryptedOutputStream() == tempDataOutputStream) {
-          this.cipherOut = fsBufferedOutput;
-        } else {
-          this.cipherOut = cryptoParams.getEncryptedOutputStream();
-        }
-
         this.compressor = compressAlgo.getCompressor();
 
         try {
+          this.cipherOut = encrypter.encryptStream(fsBufferedOutput);
           this.out = compressionAlgo.createCompressionStream(cipherOut, compressor, 0);
         } catch (IOException e) {
           compressAlgo.returnCompressor(compressor);
@@ -349,7 +316,8 @@ public final class BCFile {
      * @see Compression#getSupportedAlgorithms
      */
     public Writer(FSDataOutputStream fout, RateLimiter writeLimiter, String compressionName,
-        Configuration conf, AccumuloConfiguration accumuloConfiguration) throws IOException {
+        Configuration conf, AccumuloConfiguration aconf, CryptoService cryptoService)
+        throws IOException {
       if (fout.getPos() != 0) {
         throw new IOException("Output file not at zero offset.");
       }
@@ -360,22 +328,8 @@ public final class BCFile {
       metaIndex = new MetaIndex();
       fsOutputBuffer = new BytesWritable();
       Magic.write(this.out);
-
-      // Set up crypto-related detail, including secret key generation and encryption
-
-      this.cryptoModule = CryptoModuleFactory.getCryptoModule(accumuloConfiguration);
-      this.cryptoParams = new BCFileCryptoModuleParameters();
-      CryptoModuleFactory.fillParamsObjectFromConfiguration(cryptoParams, accumuloConfiguration);
-      this.cryptoParams = (BCFileCryptoModuleParameters) cryptoModule
-          .generateNewRandomSessionKey(cryptoParams);
-
-      this.secretKeyEncryptionStrategy = CryptoModuleFactory
-          .getSecretKeyEncryptionStrategy(accumuloConfiguration);
-      this.cryptoParams = (BCFileCryptoModuleParameters) secretKeyEncryptionStrategy
-          .encryptSecretKey(cryptoParams);
-
-      // secretKeyEncryptionStrategy.encryptSecretKey(cryptoParameters);
-
+      this.cryptoEnvironment = new CryptoEnvironmentImpl(Scope.RFILE, null);
+      this.encrypter = cryptoService.getFileEncrypter(this.cryptoEnvironment);
     }
 
     /**
@@ -403,20 +357,14 @@ public final class BCFile {
           long offsetIndexMeta = out.position();
           metaIndex.write(out);
 
-          if (cryptoParams.getCipherSuite() == null || cryptoParams.getCipherSuite()
-              .equals(Property.CRYPTO_CIPHER_SUITE.getDefaultValue())) {
-            out.writeLong(offsetIndexMeta);
-            API_VERSION_1.write(out);
-          } else {
-            long offsetCryptoParameters = out.position();
-            cryptoParams.write(out);
-
-            // Meta Index, crypto params offsets and the trailing section are written out directly.
-            out.writeLong(offsetIndexMeta);
-            out.writeLong(offsetCryptoParameters);
-            API_VERSION.write(out);
-          }
+          long offsetCryptoParameter = out.position();
+          byte[] cryptoParams = this.encrypter.getDecryptionParameters();
+          out.writeInt(cryptoParams.length);
+          out.write(cryptoParams);
 
+          out.writeLong(offsetIndexMeta);
+          out.writeLong(offsetCryptoParameter);
+          API_VERSION_3.write(out);
           Magic.write(out);
           out.flush();
           length = out.position();
@@ -442,8 +390,7 @@ public final class BCFile {
       }
 
       MetaBlockRegister mbr = new MetaBlockRegister(name, compressAlgo);
-      WBlockState wbs = new WBlockState(compressAlgo, out, fsOutputBuffer, conf, cryptoModule,
-          cryptoParams);
+      WBlockState wbs = new WBlockState(compressAlgo, out, fsOutputBuffer, conf, encrypter);
       BlockAppender ba = new BlockAppender(mbr, wbs);
       blkInProgress = true;
       metaBlkSeen = true;
@@ -483,7 +430,7 @@ public final class BCFile {
       }
 
       WBlockState wbs = new WBlockState(getDefaultCompressionAlgorithm(), out, fsOutputBuffer, conf,
-          cryptoModule, cryptoParams);
+          encrypter);
       BlockAppender ba = new BlockAppender(wbs);
       blkInProgress = true;
       return ba;
@@ -508,76 +455,6 @@ public final class BCFile {
     }
   }
 
-  // sha256 of some random data
-  // @formatter:off
-  private static final byte[] NO_CPYPTO_KEY =
-    "ce18cf53c4c5077f771249b38033fa14bcb31cca0e5e95a371ee72daa8342ea2".getBytes(UTF_8);
-  // @formatter:on
-
-  // This class is used as a place holder in the cache for RFiles that have no crypto....
-  private static final BCFileCryptoModuleParameters NO_CRYPTO = new BCFileCryptoModuleParameters() {
-
-    @Override
-    public Map<String,String> getAllOptions() {
-      return Collections.emptyMap();
-    }
-
-    @Override
-    public byte[] getEncryptedKey() {
-      return NO_CPYPTO_KEY;
-    }
-
-    @Override
-    public String getOpaqueKeyEncryptionKeyID() {
-      // NONE + sha256 of random data
-      return "NONE:a4007e6aefb095a5a47030cd6c850818fb3a685dc6e85ba1ecc5a44ba68b193b";
-    }
-
-  };
-
-  private static class BCFileCryptoModuleParameters extends CryptoModuleParameters {
-
-    public void write(DataOutput out) throws IOException {
-      // Write out the context
-      out.writeInt(getAllOptions().size());
-      for (String key : getAllOptions().keySet()) {
-        out.writeUTF(key);
-        out.writeUTF(getAllOptions().get(key));
-      }
-
-      // Write the opaque ID
-      out.writeUTF(getOpaqueKeyEncryptionKeyID());
-
-      // Write the encrypted secret key
-      out.writeInt(getEncryptedKey().length);
-      out.write(getEncryptedKey());
-
-    }
-
-    public void read(DataInput in) throws IOException {
-
-      Map<String,String> optionsFromFile = new HashMap<>();
-
-      int numContextEntries = in.readInt();
-      for (int i = 0; i < numContextEntries; i++) {
-        optionsFromFile.put(in.readUTF(), in.readUTF());
-      }
-
-      CryptoModuleFactory.fillParamsObjectFromStringMap(this, optionsFromFile);
-
-      // Read opaque key encryption ID
-      setOpaqueKeyEncryptionKeyID(in.readUTF());
-
-      // Read encrypted secret key
-      int encryptedSecretKeyLength = in.readInt();
-      byte[] encryptedSecretKey = new byte[encryptedSecretKeyLength];
-      in.readFully(encryptedSecretKey);
-      setEncryptedKey(encryptedSecretKey);
-
-    }
-
-  }
-
   /**
    * BCFile Reader, interface to read the file's data and meta blocks.
    */
@@ -588,9 +465,8 @@ public final class BCFile {
     // Index for meta blocks
     final MetaIndex metaIndex;
     final Version version;
-    private BCFileCryptoModuleParameters cryptoParams;
-    private CryptoModule cryptoModule;
-    private SecretKeyEncryptionStrategy secretKeyEncryptionStrategy;
+    private byte[] decryptionParams;
+    private FileDecrypter decrypter;
 
     /**
      * Intermediate class that maintain the state of a Readable Compression Block.
@@ -603,8 +479,8 @@ public final class BCFile {
       private volatile boolean closed;
 
       public <InputStreamType extends InputStream & Seekable> RBlockState(Algorithm compressionAlgo,
-          InputStreamType fsin, BlockRegion region, Configuration conf, CryptoModule cryptoModule,
-          CryptoModuleParameters cryptoParams) throws IOException {
+          InputStreamType fsin, BlockRegion region, Configuration conf, FileDecrypter decrypter)
+          throws IOException {
         this.compressAlgo = compressionAlgo;
         this.region = region;
         this.decompressor = compressionAlgo.getDecompressor();
@@ -613,28 +489,8 @@ public final class BCFile {
             fsin, this.region.getOffset(), this.region.getCompressedSize());
         InputStream inputStreamToBeCompressed = boundedRangeFileInputStream;
 
-        if (cryptoParams != null && cryptoModule != null) {
-          DataInputStream tempDataInputStream = new DataInputStream(boundedRangeFileInputStream);
-          // Read the init vector from the front of the stream before initializing the cipher stream
-
-          int ivLength = tempDataInputStream.readInt();
-          byte[] initVector = new byte[ivLength];
-          tempDataInputStream.readFully(initVector);
-
-          cryptoParams.setInitializationVector(initVector);
-          cryptoParams.setEncryptedInputStream(boundedRangeFileInputStream);
-
-          // These two flags mirror those in WBlockState, and are very necessary to set in order
-          // that the underlying stream be written and handled
-          // correctly.
-          cryptoParams.setCloseUnderylingStreamAfterCryptoStreamClose(false);
-          cryptoParams.setRecordParametersToStream(false);
-
-          cryptoParams = cryptoModule.getDecryptingInputStream(cryptoParams);
-          inputStreamToBeCompressed = cryptoParams.getPlaintextInputStream();
-        }
-
         try {
+          inputStreamToBeCompressed = decrypter.decryptStream(inputStreamToBeCompressed);
           this.in = compressAlgo.createDecompressionStream(inputStreamToBeCompressed, decompressor,
               getFSInputBufferSize(conf));
         } catch (IOException e) {
@@ -729,12 +585,8 @@ public final class BCFile {
         if (out.size() > maxSize) {
           return null;
         }
-        if (cryptoParams == null) {
-          out.writeBoolean(false);
-        } else {
-          out.writeBoolean(true);
-          cryptoParams.write(out);
-        }
+
+        CryptoUtils.writeParams(this.decryptionParams, out);
 
         if (out.size() > maxSize) {
           return null;
@@ -756,8 +608,8 @@ public final class BCFile {
      *          Length of the corresponding file
      */
     public <InputStreamType extends InputStream & Seekable> Reader(InputStreamType fin,
-        long fileLength, Configuration conf, AccumuloConfiguration accumuloConfiguration)
-        throws IOException {
+        long fileLength, Configuration conf, AccumuloConfiguration aconf,
+        CryptoService cryptoService) throws IOException {
       this.in = new SeekableDataInputStream(fin);
       this.conf = conf;
 
@@ -766,9 +618,11 @@ public final class BCFile {
       version = new Version(this.in);
       Magic.readAndVerify(this.in);
 
-      // Do a version check
-      if (!version.compatibleWith(BCFile.API_VERSION) && !version.equals(BCFile.API_VERSION_1)) {
-        throw new RuntimeException("Incompatible BCFile fileBCFileVersion.");
+      // Do a version check - API_VERSION_2 used experimental crypto parameters, no longer supported
+      if (!version.compatibleWith(BCFile.API_VERSION_3)
+          && !version.compatibleWith(BCFile.API_VERSION_1)) {
+        throw new IOException("Unsupported BCFile Version found: " + version.toString() + ". "
+            + "Only support " + API_VERSION_1 + " or " + API_VERSION_3);
       }
 
       // Read the right number offsets based on version
@@ -776,11 +630,10 @@ public final class BCFile {
       long offsetCryptoParameters = 0;
 
       if (version.equals(API_VERSION_1)) {
-        this.in.seek(fileLength - Magic.size() - Version.size() - (Long.SIZE / Byte.SIZE));
+        this.in.seek(fileLength - Magic.size() - Version.size() - Long.BYTES);
         offsetIndexMeta = this.in.readLong();
-
       } else {
-        this.in.seek(fileLength - Magic.size() - Version.size() - (2 * (Long.SIZE / Byte.SIZE)));
+        this.in.seek(fileLength - Magic.size() - Version.size() - (2 * Long.BYTES));
         offsetIndexMeta = this.in.readLong();
         offsetCryptoParameters = this.in.readLong();
       }
@@ -789,49 +642,21 @@ public final class BCFile {
       this.in.seek(offsetIndexMeta);
       metaIndex = new MetaIndex(this.in);
 
-      // If they exist, read the crypto parameters
-      if (!version.equals(BCFile.API_VERSION_1)) {
-
-        // read crypto parameters
-        this.in.seek(offsetCryptoParameters);
-        cryptoParams = new BCFileCryptoModuleParameters();
-        cryptoParams.read(this.in);
-
-        this.cryptoModule = CryptoModuleFactory.getCryptoModule(
-            cryptoParams.getAllOptions().get(Property.CRYPTO_MODULE_CLASS.getKey()));
-
-        // TODO: Do I need this? Hmmm, maybe I do.
-        if (accumuloConfiguration
-            .getBoolean(Property.CRYPTO_OVERRIDE_KEY_STRATEGY_WITH_CONFIGURED_STRATEGY)) {
-          Map<String,String> cryptoConfFromAccumuloConf = accumuloConfiguration
-              .getAllPropertiesWithPrefix(Property.CRYPTO_PREFIX);
-          Map<String,String> instanceConf = accumuloConfiguration
-              .getAllPropertiesWithPrefix(Property.INSTANCE_PREFIX);
-
-          cryptoConfFromAccumuloConf.putAll(instanceConf);
-
-          for (String name : cryptoParams.getAllOptions().keySet()) {
-            if (!name.equals(Property.CRYPTO_SECRET_KEY_ENCRYPTION_STRATEGY_CLASS.getKey())) {
-              cryptoConfFromAccumuloConf.put(name, cryptoParams.getAllOptions().get(name));
-            } else {
-              cryptoParams.setKeyEncryptionStrategyClass(cryptoConfFromAccumuloConf
-                  .get(Property.CRYPTO_SECRET_KEY_ENCRYPTION_STRATEGY_CLASS.getKey()));
-            }
-          }
-
-          cryptoParams.setAllOptions(cryptoConfFromAccumuloConf);
-        }
-
-        this.secretKeyEncryptionStrategy = CryptoModuleFactory
-            .getSecretKeyEncryptionStrategy(cryptoParams.getKeyEncryptionStrategyClass());
-
-        // This call should put the decrypted session key within the cryptoParameters object
-        cryptoParams = (BCFileCryptoModuleParameters) secretKeyEncryptionStrategy
-            .decryptSecretKey(cryptoParams);
+      CryptoEnvironment cryptoEnvironment = null;
+      if (cryptoService == null) {
+        cryptoService = CryptoServiceFactory.getConfigured(aconf);
+      }
 
-        // secretKeyEncryptionStrategy.decryptSecretKey(cryptoParameters);
-      } else {
+      // backwards compatibility
+      if (version.equals(API_VERSION_1)) {
         LOG.trace("Found a version 1 file to read.");
+        this.decrypter = new NoFileDecrypter();
+      } else {
+        // read crypto parameters and get decrypter
+        this.in.seek(offsetCryptoParameters);
+        decryptionParams = CryptoUtils.readParams(this.in);
+        cryptoEnvironment = new CryptoEnvironmentImpl(Scope.RFILE, decryptionParams);
+        this.decrypter = cryptoService.getFileDecrypter(cryptoEnvironment);
       }
 
       // read data:BCFile.index, the data block index
@@ -841,7 +666,8 @@ public final class BCFile {
     }
 
     public <InputStreamType extends InputStream & Seekable> Reader(byte[] serializedMetadata,
-        InputStreamType fin, Configuration conf) throws IOException {
+        InputStreamType fin, Configuration conf, AccumuloConfiguration aconf,
+        CryptoService cryptoService) throws IOException {
       this.in = new SeekableDataInputStream(fin);
       this.conf = conf;
 
@@ -852,30 +678,13 @@ public final class BCFile {
 
       metaIndex = new MetaIndex(dis);
       dataIndex = new DataIndex(dis);
-      if (dis.readBoolean()) {
-        setupCryptoFromCachedData(dis);
-      }
-    }
 
-    private void setupCryptoFromCachedData(DataInput cachedCryptoParams) throws IOException {
-      BCFileCryptoModuleParameters params = new BCFileCryptoModuleParameters();
-      params.read(cachedCryptoParams);
-
-      if (Arrays.equals(params.getEncryptedKey(), NO_CRYPTO.getEncryptedKey())
-          && NO_CRYPTO.getOpaqueKeyEncryptionKeyID().equals(params.getOpaqueKeyEncryptionKeyID())) {
-        this.cryptoParams = null;
-        this.cryptoModule = null;
-        this.secretKeyEncryptionStrategy = null;
-      } else {
-        this.cryptoModule = CryptoModuleFactory
-            .getCryptoModule(params.getAllOptions().get(Property.CRYPTO_MODULE_CLASS.getKey()));
-        this.secretKeyEncryptionStrategy = CryptoModuleFactory
-            .getSecretKeyEncryptionStrategy(params.getKeyEncryptionStrategyClass());
-
-        // This call should put the decrypted session key within the cryptoParameters object
-        cryptoParams = (BCFileCryptoModuleParameters) secretKeyEncryptionStrategy
-            .decryptSecretKey(params);
+      decryptionParams = CryptoUtils.readParams(dis);
+      CryptoEnvironmentImpl env = new CryptoEnvironmentImpl(Scope.RFILE, decryptionParams);
+      if (cryptoService == null) {
+        cryptoService = CryptoServiceFactory.getConfigured(aconf);
       }
+      this.decrypter = cryptoService.getFileDecrypter(env);
     }
 
     /**
@@ -957,7 +766,7 @@ public final class BCFile {
 
     private BlockReader createReader(Algorithm compressAlgo, BlockRegion region)
         throws IOException {
-      RBlockState rbs = new RBlockState(compressAlgo, in, region, conf, cryptoModule, cryptoParams);
+      RBlockState rbs = new RBlockState(compressAlgo, in, region, conf, decrypter);
       return new BlockReader(rbs);
     }
   }
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/PrintInfo.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/PrintInfo.java
index 09c1207..7547ba2 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/PrintInfo.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/PrintInfo.java
@@ -21,8 +21,10 @@ import java.io.PrintStream;
 import java.util.Map.Entry;
 import java.util.Set;
 
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.SiteConfiguration;
 import org.apache.accumulo.core.file.rfile.bcfile.BCFile.MetaIndexEntry;
+import org.apache.accumulo.core.security.crypto.CryptoServiceFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -32,10 +34,11 @@ public class PrintInfo {
   public static void printMetaBlockInfo(Configuration conf, FileSystem fs, Path path)
       throws IOException {
     FSDataInputStream fsin = fs.open(path);
+    AccumuloConfiguration aconf = SiteConfiguration.getInstance();
     BCFile.Reader bcfr = null;
     try {
-      bcfr = new BCFile.Reader(fsin, fs.getFileStatus(path).getLen(), conf,
-          SiteConfiguration.getInstance());
+      bcfr = new BCFile.Reader(fsin, fs.getFileStatus(path).getLen(), conf, aconf,
+          CryptoServiceFactory.getConfigured(aconf));
 
       Set<Entry<String,MetaIndexEntry>> es = bcfr.metaIndex.index.entrySet();
 
diff --git a/core/src/main/java/org/apache/accumulo/core/security/crypto/CachingHDFSSecretKeyEncryptionStrategy.java b/core/src/main/java/org/apache/accumulo/core/security/crypto/CachingHDFSSecretKeyEncryptionStrategy.java
deleted file mode 100644
index 923ea0b..0000000
--- a/core/src/main/java/org/apache/accumulo/core/security/crypto/CachingHDFSSecretKeyEncryptionStrategy.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.security.crypto;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.security.InvalidKeyException;
-import java.security.Key;
-import java.security.NoSuchAlgorithmException;
-import java.security.SecureRandom;
-
-import javax.crypto.Cipher;
-import javax.crypto.IllegalBlockSizeException;
-import javax.crypto.spec.SecretKeySpec;
-
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A {@link SecretKeyEncryptionStrategy} that gets its key from HDFS and caches it for IO.
- */
-public class CachingHDFSSecretKeyEncryptionStrategy implements SecretKeyEncryptionStrategy {
-
-  private static final Logger log = LoggerFactory
-      .getLogger(CachingHDFSSecretKeyEncryptionStrategy.class);
-  private SecretKeyCache secretKeyCache = new SecretKeyCache();
-
-  @Override
-  public CryptoModuleParameters encryptSecretKey(CryptoModuleParameters context)
-      throws IOException {
-    try {
-      secretKeyCache.ensureSecretKeyCacheInitialized(context);
-      doKeyEncryptionOperation(Cipher.WRAP_MODE, context);
-    } catch (IOException e) {
-      log.error("{}", e.getMessage(), e);
-      throw new IOException(e);
-    }
-    return context;
-  }
-
-  @Override
-  public CryptoModuleParameters decryptSecretKey(CryptoModuleParameters context) {
-    try {
-      secretKeyCache.ensureSecretKeyCacheInitialized(context);
-      doKeyEncryptionOperation(Cipher.UNWRAP_MODE, context);
-    } catch (IOException e) {
-      log.error("{}", e.getMessage(), e);
-      throw new RuntimeException(e);
-    }
-    return context;
-  }
-
-  private void doKeyEncryptionOperation(int encryptionMode, CryptoModuleParameters params)
-      throws IOException {
-    Cipher cipher = DefaultCryptoModuleUtils.getCipher(
-        params.getAllOptions().get(Property.CRYPTO_DEFAULT_KEY_STRATEGY_CIPHER_SUITE.getKey()),
-        params.getSecurityProvider());
-
-    try {
-      cipher.init(encryptionMode,
-          new SecretKeySpec(secretKeyCache.getKeyEncryptionKey(), params.getKeyAlgorithmName()));
-    } catch (InvalidKeyException e) {
-      log.error("{}", e.getMessage(), e);
-      throw new RuntimeException(e);
-    }
-
-    if (Cipher.UNWRAP_MODE == encryptionMode) {
-      try {
-        Key plaintextKey = cipher.unwrap(params.getEncryptedKey(), params.getKeyAlgorithmName(),
-            Cipher.SECRET_KEY);
-        params.setPlaintextKey(plaintextKey.getEncoded());
-      } catch (InvalidKeyException | NoSuchAlgorithmException e) {
-        log.error("{}", e.getMessage(), e);
-        throw new RuntimeException(e);
-      }
-    } else {
-      Key plaintextKey = new SecretKeySpec(params.getPlaintextKey(), params.getKeyAlgorithmName());
-      try {
-        byte[] encryptedSecretKey = cipher.wrap(plaintextKey);
-        params.setEncryptedKey(encryptedSecretKey);
-        params.setOpaqueKeyEncryptionKeyID(secretKeyCache.getPathToKeyName());
-      } catch (InvalidKeyException | IllegalBlockSizeException e) {
-        log.error("{}", e.getMessage(), e);
-        throw new RuntimeException(e);
-      }
-
-    }
-  }
-
-  private static class SecretKeyCache {
-
-    private boolean initialized = false;
-    private byte[] keyEncryptionKey;
-    private String pathToKeyName;
-
-    public SecretKeyCache() {}
-
-    public synchronized void ensureSecretKeyCacheInitialized(CryptoModuleParameters context)
-        throws IOException {
-
-      if (initialized) {
-        return;
-      }
-
-      // First identify if the KEK already exists
-      pathToKeyName = getFullPathToKey(context);
-
-      if (pathToKeyName == null || pathToKeyName.equals("")) {
-        pathToKeyName = Property.CRYPTO_DEFAULT_KEY_STRATEGY_KEY_LOCATION.getDefaultValue();
-      }
-
-      // TODO ACCUMULO-2530 Ensure volumes are properly supported
-      Path pathToKey = new Path(pathToKeyName);
-      FileSystem fs = FileSystem.get(CachedConfiguration.getInstance());
-
-      DataInputStream in = null;
-      boolean invalidFile = false;
-      int keyEncryptionKeyLength = 0;
-
-      try {
-        if (!fs.exists(pathToKey)) {
-          initializeKeyEncryptionKey(fs, pathToKey, context);
-        }
-
-        in = fs.open(pathToKey);
-
-        keyEncryptionKeyLength = in.readInt();
-        // If the file length does not correctly relate to the expected key size, there is an
-        // inconsistency and
-        // we have no way of knowing the correct key length.
-        // The keyEncryptionKeyLength+4 accounts for the integer read from the file.
-        if (fs.getFileStatus(pathToKey).getLen() != keyEncryptionKeyLength + 4) {
-          invalidFile = true;
-          // Passing this exception forward so we can provide the more useful error message
-          throw new IOException();
-        }
-        keyEncryptionKey = new byte[keyEncryptionKeyLength];
-        in.readFully(keyEncryptionKey);
-
-        initialized = true;
-
-      } catch (EOFException e) {
-        throw new IOException(
-            "Could not initialize key encryption cache, malformed key encryption key file", e);
-      } catch (IOException e) {
-        if (invalidFile) {
-          throw new IOException("Could not initialize key encryption cache,"
-              + " malformed key encryption key file. Expected key of lengh "
-              + keyEncryptionKeyLength + " but file contained "
-              + (fs.getFileStatus(pathToKey).getLen() - 4) + "bytes for key encryption key.");
-        } else {
-          throw new IOException("Could not initialize key encryption cache,"
-              + " unable to access or find key encryption key file", e);
-        }
-      } finally {
-        IOUtils.closeQuietly(in);
-      }
-    }
-
-    private void initializeKeyEncryptionKey(FileSystem fs, Path pathToKey,
-        CryptoModuleParameters params) throws IOException {
-      DataOutputStream out = null;
-      try {
-        out = fs.create(pathToKey);
-        // Very important, lets hedge our bets
-        fs.setReplication(pathToKey, (short) 5);
-        SecureRandom random = DefaultCryptoModuleUtils.getSecureRandom(
-            params.getRandomNumberGenerator(), params.getRandomNumberGeneratorProvider());
-        int keyLength = params.getKeyLength();
-        byte[] newRandomKeyEncryptionKey = new byte[keyLength / 8];
-        random.nextBytes(newRandomKeyEncryptionKey);
-        out.writeInt(newRandomKeyEncryptionKey.length);
-        out.write(newRandomKeyEncryptionKey);
-        out.flush();
-      } finally {
-        if (out != null) {
-          out.close();
-        }
-      }
-
-    }
-
-    @SuppressWarnings("deprecation")
-    private String getFullPathToKey(CryptoModuleParameters params) {
-      String pathToKeyName = params.getAllOptions()
-          .get(Property.CRYPTO_DEFAULT_KEY_STRATEGY_KEY_LOCATION.getKey());
-      String instanceDirectory = params.getAllOptions().get(Property.INSTANCE_DFS_DIR.getKey());
-
-      if (pathToKeyName == null) {
-        pathToKeyName = Property.CRYPTO_DEFAULT_KEY_STRATEGY_KEY_LOCATION.getDefaultValue();
-      }
-
-      if (instanceDirectory == null) {
-        instanceDirectory = Property.INSTANCE_DFS_DIR.getDefaultValue();
-      }
-
-      if (!pathToKeyName.startsWith("/")) {
-        pathToKeyName = "/" + pathToKeyName;
-      }
-
-      return instanceDirectory + pathToKeyName;
-    }
-
-    public byte[] getKeyEncryptionKey() {
-      return keyEncryptionKey;
-    }
-
-    public String getPathToKeyName() {
-      return pathToKeyName;
-    }
-  }
-
-}
diff --git a/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModule.java b/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModule.java
deleted file mode 100644
index 323a5fd..0000000
--- a/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModule.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.security.crypto;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import javax.crypto.Cipher;
-import javax.crypto.CipherInputStream;
-import javax.crypto.CipherOutputStream;
-
-/**
- * Classes that obey this interface may be used to provide encrypting and decrypting streams to the
- * rest of Accumulo. Classes that obey this interface may be configured as the crypto module by
- * setting the property crypto.module.class in the accumulo-site.xml file.
- *
- * When implementing CryptoModule, it is recommended that any {@link CipherOutputStream} uses
- * {@link RFileCipherOutputStream} instead.
- */
-public interface CryptoModule {
-
-  /**
-   * Takes a {@link CryptoModuleParameters} object containing an {@link OutputStream} to wrap within
-   * a {@link CipherOutputStream}. The various other parts of the {@link CryptoModuleParameters}
-   * object specify the details about the type of encryption to use. Callers should pay special
-   * attention to the {@link CryptoModuleParameters#getRecordParametersToStream()} and
-   * {@link CryptoModuleParameters#getCloseUnderylingStreamAfterCryptoStreamClose()} flags within
-   * the {@link CryptoModuleParameters} object, as they control whether or not this method will
-   * write to the given {@link OutputStream} in
-   * {@link CryptoModuleParameters#getPlaintextOutputStream()}.
-   *
-   * <p>
-   *
-   * This method returns a {@link CryptoModuleParameters} object. Implementers of this interface
-   * maintain a contract that the returned object is <i>the same</i> as the one passed in, always.
-   * Return values are enclosed within that object, as some other calls will typically return more
-   * than one value.
-   *
-   * @param params
-   *          the {@link CryptoModuleParameters} object that specifies how to set up the encrypted
-   *          stream.
-   * @return the same {@link CryptoModuleParameters} object with the
-   *         {@link CryptoModuleParameters#getEncryptedOutputStream()} set to a stream that is not
-   *         null. That stream may be exactly the same stream as
-   *         {@link CryptoModuleParameters#getPlaintextInputStream()} if the params object specifies
-   *         no cryptography.
-   */
-  CryptoModuleParameters getEncryptingOutputStream(CryptoModuleParameters params)
-      throws IOException;
-
-  /**
-   * Takes a {@link CryptoModuleParameters} object containing an {@link InputStream} to wrap within
-   * a {@link CipherInputStream}. The various other parts of the {@link CryptoModuleParameters}
-   * object specify the details about the type of encryption to use. Callers should pay special
-   * attention to the {@link CryptoModuleParameters#getRecordParametersToStream()} and
-   * {@link CryptoModuleParameters#getCloseUnderylingStreamAfterCryptoStreamClose()} flags within
-   * the {@link CryptoModuleParameters} object, as they control whether or not this method will read
-   * from the given {@link InputStream} in {@link CryptoModuleParameters#getEncryptedInputStream()}.
-   *
-   * <p>
-   *
-   * This method returns a {@link CryptoModuleParameters} object. Implementers of this interface
-   * maintain a contract that the returned object is <i>the same</i> as the one passed in, always.
-   * Return values are enclosed within that object, as some other calls will typically return more
-   * than one value.
-   *
-   * @param params
-   *          the {@link CryptoModuleParameters} object that specifies how to set up the encrypted
-   *          stream.
-   * @return the same {@link CryptoModuleParameters} object with the
-   *         {@link CryptoModuleParameters#getPlaintextInputStream()} set to a stream that is not
-   *         null. That stream may be exactly the same stream as
-   *         {@link CryptoModuleParameters#getEncryptedInputStream()} if the params object specifies
-   *         no cryptography.
-   */
-  CryptoModuleParameters getDecryptingInputStream(CryptoModuleParameters params) throws IOException;
-
-  /**
-   * Generates a random session key and sets it into the
-   * {@link CryptoModuleParameters#getPlaintextKey()} property. Saves callers from having to set up
-   * their own secure random provider. Also will set the
-   * {@link CryptoModuleParameters#getSecureRandom()} property if it has not already been set by
-   * some other function.
-   *
-   * @param params
-   *          a {@link CryptoModuleParameters} object contained a correctly instantiated set of
-   *          properties.
-   * @return the same {@link CryptoModuleParameters} object with the plaintext key set
-   */
-  CryptoModuleParameters generateNewRandomSessionKey(CryptoModuleParameters params);
-
-  /**
-   * Generates a {@link Cipher} object based on the parameters in the given
-   * {@link CryptoModuleParameters} object and places it into the
-   * {@link CryptoModuleParameters#getCipher()} property. Callers may choose to use this method if
-   * they want to get the initialization vector from the cipher before proceeding to create wrapped
-   * streams.
-   *
-   * @param params
-   *          a {@link CryptoModuleParameters} object contained a correctly instantiated set of
-   *          properties.
-   * @return the same {@link CryptoModuleParameters} object with the cipher set.
-   */
-  CryptoModuleParameters initializeCipher(CryptoModuleParameters params);
-
-}
diff --git a/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModuleFactory.java b/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModuleFactory.java
deleted file mode 100644
index 5f27cb4..0000000
--- a/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModuleFactory.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.security.crypto;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This factory module exists to assist other classes in loading crypto modules.
- */
-public class CryptoModuleFactory {
-
-  private static final Logger log = LoggerFactory.getLogger(CryptoModuleFactory.class);
-  private static final Map<String,CryptoModule> cryptoModulesCache = new HashMap<>();
-  private static final Map<String,SecretKeyEncryptionStrategy> secretKeyEncryptionStrategyCache = new HashMap<>();
-
-  /**
-   * This method returns a crypto module based on settings in the given configuration parameter.
-   *
-   * @return a class implementing the CryptoModule interface. It will *never* return null; rather,
-   *         it will return a class which obeys the interface but makes no changes to the underlying
-   *         data.
-   */
-  public static CryptoModule getCryptoModule(AccumuloConfiguration conf) {
-    String cryptoModuleClassname = conf.get(Property.CRYPTO_MODULE_CLASS);
-    return getCryptoModule(cryptoModuleClassname);
-  }
-
-  public static CryptoModule getCryptoModule(String cryptoModuleClassname) {
-
-    if (cryptoModuleClassname != null) {
-      cryptoModuleClassname = cryptoModuleClassname.trim();
-    }
-
-    if (cryptoModuleClassname == null || cryptoModuleClassname.equals("NullCryptoModule")) {
-      return new NullCryptoModule();
-    }
-
-    CryptoModule cryptoModule = null;
-    synchronized (cryptoModulesCache) {
-      if (cryptoModulesCache.containsKey(cryptoModuleClassname)) {
-        cryptoModule = cryptoModulesCache.get(cryptoModuleClassname);
-      } else {
-        cryptoModule = instantiateCryptoModule(cryptoModuleClassname);
-        cryptoModulesCache.put(cryptoModuleClassname, cryptoModule);
-      }
-    }
-
-    return cryptoModule;
-  }
-
-  private static CryptoModule instantiateCryptoModule(String cryptoModuleClassname) {
-    log.debug("About to instantiate crypto module {}", cryptoModuleClassname);
-
-    try {
-      CryptoModule cryptoModule = AccumuloVFSClassLoader.loadClass(cryptoModuleClassname)
-          .asSubclass(CryptoModule.class).newInstance();
-
-      log.debug("Successfully instantiated crypto module {}", cryptoModuleClassname);
-
-      return cryptoModule;
-    } catch (ClassNotFoundException e1) {
-      throw new IllegalArgumentException(
-          "Could not find configured crypto module " + cryptoModuleClassname);
-    } catch (ClassCastException cce) {
-      throw new IllegalArgumentException("Configured Accumulo crypto module "
-          + cryptoModuleClassname + " does not implement the CryptoModule interface.");
-    } catch (InstantiationException | IllegalAccessException e) {
-      throw new IllegalArgumentException(
-          "Unable to instantiate the crypto module: " + cryptoModuleClassname, e);
-    }
-  }
-
-  public static SecretKeyEncryptionStrategy getSecretKeyEncryptionStrategy(
-      AccumuloConfiguration conf) {
-    String className = conf.get(Property.CRYPTO_SECRET_KEY_ENCRYPTION_STRATEGY_CLASS);
-    return getSecretKeyEncryptionStrategy(className);
-  }
-
-  public static SecretKeyEncryptionStrategy getSecretKeyEncryptionStrategy(String className) {
-
-    if (className != null) {
-      className = className.trim();
-    }
-
-    if (className == null || className.equals("NullSecretKeyEncryptionStrategy")) {
-      return new NullSecretKeyEncryptionStrategy();
-    }
-
-    SecretKeyEncryptionStrategy strategy = null;
-    synchronized (secretKeyEncryptionStrategyCache) {
-      if (secretKeyEncryptionStrategyCache.containsKey(className)) {
-        strategy = secretKeyEncryptionStrategyCache.get(className);
-      } else {
-        strategy = instantiateSecreteKeyEncryptionStrategy(className);
-        secretKeyEncryptionStrategyCache.put(className, strategy);
-      }
-    }
-
-    return strategy;
-  }
-
-  private static SecretKeyEncryptionStrategy instantiateSecreteKeyEncryptionStrategy(
-      String className) {
-
-    log.debug("About to instantiate secret key encryption strategy {}", className);
-
-    try {
-      SecretKeyEncryptionStrategy strategy = AccumuloVFSClassLoader.loadClass(className)
-          .asSubclass(SecretKeyEncryptionStrategy.class).newInstance();
-
-      log.debug("Successfully instantiated secret key encryption strategy {}", className);
-
-      return strategy;
-    } catch (ClassNotFoundException e1) {
-      throw new IllegalArgumentException(
-          "Could not find configured secret key encryption strategy: " + className);
-    } catch (ClassCastException e) {
-      throw new IllegalArgumentException("Configured Accumulo secret key encryption strategy \""
-          + className + "\" does not implement the SecretKeyEncryptionStrategy interface.");
-    } catch (InstantiationException | IllegalAccessException e) {
-      throw new IllegalArgumentException(
-          "Unable to instantiate the secret key encryption strategy: " + className, e);
-    }
-  }
-
-  static class NullSecretKeyEncryptionStrategy implements SecretKeyEncryptionStrategy {
-
-    @Override
-    public CryptoModuleParameters encryptSecretKey(CryptoModuleParameters params) {
-      params.setEncryptedKey(params.getPlaintextKey());
-      params.setOpaqueKeyEncryptionKeyID("");
-
-      return params;
-    }
-
-    @Override
-    public CryptoModuleParameters decryptSecretKey(CryptoModuleParameters params) {
-      params.setPlaintextKey(params.getEncryptedKey());
-      return params;
-    }
-
-  }
-
-  static class NullCryptoModule implements CryptoModule {
-
-    @Override
-    public CryptoModuleParameters getEncryptingOutputStream(CryptoModuleParameters params)
-        throws IOException {
-      params.setEncryptedOutputStream(params.getPlaintextOutputStream());
-      return params;
-    }
-
-    @Override
-    public CryptoModuleParameters getDecryptingInputStream(CryptoModuleParameters params)
-        throws IOException {
-      params.setPlaintextInputStream(params.getEncryptedInputStream());
-      return params;
-    }
-
-    @Override
-    public CryptoModuleParameters generateNewRandomSessionKey(CryptoModuleParameters params) {
-      params.setPlaintextKey(new byte[0]);
-      return params;
-    }
-
-    @Override
-    public CryptoModuleParameters initializeCipher(CryptoModuleParameters params) {
-      return params;
-    }
-
-  }
-
-  public static CryptoModuleParameters createParamsObjectFromAccumuloConfiguration(
-      AccumuloConfiguration conf) {
-    CryptoModuleParameters params = new CryptoModuleParameters();
-
-    return fillParamsObjectFromConfiguration(params, conf);
-  }
-
-  public static CryptoModuleParameters fillParamsObjectFromConfiguration(
-      CryptoModuleParameters params, AccumuloConfiguration conf) {
-    // Get all the options from the configuration
-    Map<String,String> cryptoOpts = new HashMap<>(
-        conf.getAllPropertiesWithPrefix(Property.CRYPTO_PREFIX));
-    cryptoOpts.putAll(conf.getAllPropertiesWithPrefix(Property.INSTANCE_PREFIX));
-    cryptoOpts.remove(Property.INSTANCE_SECRET.getKey());
-    cryptoOpts.put(Property.CRYPTO_BLOCK_STREAM_SIZE.getKey(),
-        Integer.toString((int) conf.getAsBytes(Property.CRYPTO_BLOCK_STREAM_SIZE)));
-
-    return fillParamsObjectFromStringMap(params, cryptoOpts);
-  }
-
-  public static CryptoModuleParameters fillParamsObjectFromStringMap(CryptoModuleParameters params,
-      Map<String,String> cryptoOpts) {
-    params.setCipherSuite(cryptoOpts.get(Property.CRYPTO_CIPHER_SUITE.getKey()));
-    // If no encryption has been specified, then we abort here.
-    if (params.getCipherSuite() == null || params.getCipherSuite().equals("NullCipher")) {
-      params.setAllOptions(cryptoOpts);
-
-      return params;
-    }
-
-    params.setAllOptions(cryptoOpts);
-
-    params.setKeyAlgorithmName(cryptoOpts.get(Property.CRYPTO_CIPHER_KEY_ALGORITHM_NAME.getKey()));
-    params.setKeyEncryptionStrategyClass(
-        cryptoOpts.get(Property.CRYPTO_SECRET_KEY_ENCRYPTION_STRATEGY_CLASS.getKey()));
-    params
-        .setKeyLength(Integer.parseInt(cryptoOpts.get(Property.CRYPTO_CIPHER_KEY_LENGTH.getKey())));
-    params.setOverrideStreamsSecretKeyEncryptionStrategy(Boolean.parseBoolean(
-        cryptoOpts.get(Property.CRYPTO_OVERRIDE_KEY_STRATEGY_WITH_CONFIGURED_STRATEGY.getKey())));
-    params.setRandomNumberGenerator(cryptoOpts.get(Property.CRYPTO_SECURE_RNG.getKey()));
-    params.setRandomNumberGeneratorProvider(
-        cryptoOpts.get(Property.CRYPTO_SECURE_RNG_PROVIDER.getKey()));
-    params.setSecurityProvider(cryptoOpts.get(Property.CRYPTO_SECURITY_PROVIDER.getKey()));
-    String blockStreamSize = cryptoOpts.get(Property.CRYPTO_BLOCK_STREAM_SIZE.getKey());
-    if (blockStreamSize != null)
-      params.setBlockStreamSize(Integer.parseInt(blockStreamSize));
-
-    return params;
-  }
-
-}
diff --git a/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModuleParameters.java b/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModuleParameters.java
deleted file mode 100644
index c4cc3d2..0000000
--- a/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModuleParameters.java
+++ /dev/null
@@ -1,725 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.accumulo.core.security.crypto;
-
-import java.io.FilterOutputStream;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.security.SecureRandom;
-import java.util.Map;
-
-import javax.crypto.Cipher;
-import javax.crypto.CipherOutputStream;
-
-/**
- * This class defines several parameters needed by by a module providing cryptographic stream
- * support in Accumulo. The following Javadoc details which parameters are used for which operations
- * (encryption vs. decryption), which ones return values (i.e. are "out" parameters from the
- * {@link CryptoModule}), and which ones are required versus optional in certain situations.
- *
- * Most of the time, these classes can be constructed using
- * {@link CryptoModuleFactory#createParamsObjectFromAccumuloConfiguration(org.apache.accumulo.core.conf.AccumuloConfiguration)}.
- */
-public class CryptoModuleParameters {
-
-  /**
-   * Gets the name of the symmetric algorithm to use for the creation of encryption keys.
-   *
-   * @see CryptoModuleParameters#setKeyAlgorithmName(String)
-   */
-
-  public String getKeyAlgorithmName() {
-    return keyAlgorithmName;
-  }
-
-  /**
-   * Sets the name of the symmetric algorithm to use for the creation of encryption keys.
-   * <p>
-   * Valid names are names recognized by your cryptographic engine provider. For the default Java
-   * provider, valid names would include things like "AES", "RC4", "DESede", etc.
-   * <p>
-   * For <b>encryption</b>, this value is <b>required</b> and is always used. Its value should be
-   * prepended or otherwise included with the ciphertext for future decryption. <br>
-   * For <b>decryption</b>, this value is often disregarded in favor of the value encoded with the
-   * ciphertext.
-   *
-   * @param keyAlgorithmName
-   *          the name of the cryptographic algorithm to use.
-   * @see <a href=
-   *      "http://docs.oracle.com/javase/1.5.0/docs/guide/security/jce/JCERefGuide.html#AppA">Standard
-   *      Algorithm Names in JCE</a>
-   *
-   */
-
-  public void setKeyAlgorithmName(String keyAlgorithmName) {
-    this.keyAlgorithmName = keyAlgorithmName;
-  }
-
-  /**
-   * Gets the name of the cipher suite used for encryption
-   *
-   * @see CryptoModuleParameters#setCipherSuite(String)
-   */
-
-  public String getCipherSuite() {
-    return cipherSuite;
-  }
-
-  /**
-   * Sets the name of the crypto suite to use for an encryption stream.
-   * <p>
-   * Valid names are names recognized by your cryptographic engine provider.
-   *
-   * The format for input should be: algorithm/mode/padding
-   *
-   * For the default Java provider, valid names would include things like "AES/CBC/NoPadding".
-   * <p>
-   * For <b>encryption</b>, this value is <b>required</b> and is always used. Its value should be
-   * prepended or otherwise included with the ciphertext for future decryption. <br>
-   * For <b>decryption</b>, this value is often disregarded in favor of the value encoded with the
-   * ciphertext.
-   *
-   * @param cipherSuite
-   *          the cipher suite to use.
-   *
-   */
-  public void setCipherSuite(String cipherSuite) {
-    this.cipherSuite = cipherSuite;
-  }
-
-  /**
-   * Gets the plaintext secret key.
-   * <p>
-   * For <b>decryption</b>, this value is often the out parameter of using a secret key encryption
-   * strategy to decrypt an encrypted version of this secret key. (See
-   * {@link CryptoModuleParameters#setKeyEncryptionStrategyClass(String)}.)
-   *
-   *
-   * @see CryptoModuleParameters#setPlaintextKey(byte[])
-   */
-  public byte[] getPlaintextKey() {
-    return plaintextKey;
-  }
-
-  /**
-   * Sets the plaintext secret key that will be used to encrypt and decrypt bytes.
-   * <p>
-   * Valid values and lengths for this secret key depend entirely on the algorithm type. Refer to
-   * the documentation about the algorithm for further information.
-   * <p>
-   * For <b>encryption</b>, this value is <b>optional</b>. If it is not provided, it will be
-   * automatically generated by the underlying cryptographic module. <br>
-   * For <b>decryption</b>, this value is often obtained from the underlying cipher stream, or
-   * derived from the encrypted version of the key (see
-   * {@link CryptoModuleParameters#setEncryptedKey(byte[])}).
-   *
-   * @param plaintextKey
-   *          the value of the plaintext secret key
-   */
-
-  public void setPlaintextKey(byte[] plaintextKey) {
-    this.plaintextKey = plaintextKey;
-  }
-
-  /**
-   * Gets the length of the secret key.
-   *
-   * @see CryptoModuleParameters#setKeyLength(int)
-   */
-  public int getKeyLength() {
-    return keyLength;
-  }
-
-  /**
-   * Sets the length of the secret key that will be used to encrypt and decrypt bytes.
-   * <p>
-   * Valid lengths depend entirely on the algorithm type. Refer to the documentation about the
-   * algorithm for further information. (For example, AES may use either 128 or 256 bit keys in the
-   * default Java cryptography provider.)
-   * <p>
-   * For <b>encryption</b>, this value is <b>required if the secret key is not set</b>. <br>
-   * For <b>decryption</b>, this value is often obtained from the underlying cipher stream, or
-   * derived from the encrypted version of the key (see
-   * {@link CryptoModuleParameters#setEncryptedKey(byte[])}).
-   *
-   * @param keyLength
-   *          the length of the secret key to be generated
-   */
-
-  public void setKeyLength(int keyLength) {
-    this.keyLength = keyLength;
-  }
-
-  /**
-   * Gets the random number generator name.
-   *
-   * @see CryptoModuleParameters#setRandomNumberGenerator(String)
-   */
-
-  public String getRandomNumberGenerator() {
-    return randomNumberGenerator;
-  }
-
-  /**
-   * Sets the name of the random number generator to use. The default for this for the baseline JCE
-   * implementation is "SHA1PRNG".
-   * <p>
-   * For <b>encryption</b>, this value is <b>required</b>.<br>
-   * For <b>decryption</b>, this value is often obtained from the underlying cipher stream.
-   *
-   * @param randomNumberGenerator
-   *          the name of the random number generator to use
-   */
-
-  public void setRandomNumberGenerator(String randomNumberGenerator) {
-    this.randomNumberGenerator = randomNumberGenerator;
-  }
-
-  /**
-   * Gets the random number generator provider name.
-   *
-   * @see CryptoModuleParameters#setRandomNumberGeneratorProvider(String)
-   */
-  public String getRandomNumberGeneratorProvider() {
-    return randomNumberGeneratorProvider;
-  }
-
-  /**
-   * Sets the name of the random number generator provider to use. The default for this for the
-   * baseline JCE implementation is "SUN".
-   * <p>
-   * The provider, as the name implies, provides the RNG implementation specified by
-   * {@link CryptoModuleParameters#getRandomNumberGenerator()}.
-   * <p>
-   * For <b>encryption</b>, this value is <b>required</b>. <br>
-   * For <b>decryption</b>, this value is often obtained from the underlying cipher stream.
-   *
-   * @param randomNumberGeneratorProvider
-   *          the name of the provider to use
-   */
-
-  public void setRandomNumberGeneratorProvider(String randomNumberGeneratorProvider) {
-    this.randomNumberGeneratorProvider = randomNumberGeneratorProvider;
-  }
-
-  /**
-   * Gets the security provider name.
-   *
-   * @see #setSecurityProvider(String)
-   * @return the security provider name
-   */
-  public String getSecurityProvider() {
-    return securityProvider;
-  }
-
-  /**
-   * Sets the name of the security provider to use for crypto.
-   *
-   * @param securityProvider
-   *          the name of the provider to use
-   */
-  public void setSecurityProvider(String securityProvider) {
-    this.securityProvider = securityProvider;
-  }
-
-  /**
-   * Gets the key encryption strategy class.
-   *
-   * @see CryptoModuleParameters#setKeyEncryptionStrategyClass(String)
-   */
-
-  public String getKeyEncryptionStrategyClass() {
-    return keyEncryptionStrategyClass;
-  }
-
-  /**
-   * Sets the class name of the key encryption strategy class. The class obeys the
-   * {@link SecretKeyEncryptionStrategy} interface. It instructs the {@link DefaultCryptoModule} on
-   * how to encrypt the keys it uses to secure the streams.
-   * <p>
-   * The default implementation of this interface, {@link CachingHDFSSecretKeyEncryptionStrategy},
-   * creates a random key encryption key (KEK) as another symmetric key and places the KEK into
-   * HDFS. <i>This is not really very secure.</i> Users of the crypto modules are encouraged to
-   * either safeguard that KEK carefully or to obtain and use another
-   * {@link SecretKeyEncryptionStrategy} class.
-   * <p>
-   * For <b>encryption</b>, this value is <b>optional</b>. If it is not specified, then it assumed
-   * that the secret keys used for encrypting files will not be encrypted. This is not a secure
-   * approach, thus setting this is highly recommended.<br>
-   * For <b>decryption</b>, this value is often obtained from the underlying cipher stream. However,
-   * the underlying stream's value can be overridden (at least when using
-   * {@link DefaultCryptoModule}) by setting the
-   * {@link CryptoModuleParameters#setOverrideStreamsSecretKeyEncryptionStrategy(boolean)} to true.
-   *
-   * @param keyEncryptionStrategyClass
-   *          the name of the key encryption strategy class to use
-   */
-  public void setKeyEncryptionStrategyClass(String keyEncryptionStrategyClass) {
-    this.keyEncryptionStrategyClass = keyEncryptionStrategyClass;
-  }
-
-  /**
-   * Gets the encrypted version of the plaintext key. This parameter is generally either obtained
-   * from an underlying stream or computed in the process of employed the
-   * {@link CryptoModuleParameters#getKeyEncryptionStrategyClass()}.
-   *
-   * @see CryptoModuleParameters#setEncryptedKey(byte[])
-   */
-  public byte[] getEncryptedKey() {
-    return encryptedKey;
-  }
-
-  /**
-   * Sets the encrypted version of the plaintext key
-   * ({@link CryptoModuleParameters#getPlaintextKey()}). Generally this operation will be done
-   * either by:
-   * <ul>
-   * <li>the code reading an encrypted stream and coming across the encrypted version of one of
-   * these keys, OR
-   * <li>the {@link CryptoModuleParameters#getKeyEncryptionStrategyClass()} that encrypted the
-   * plaintext key (see {@link CryptoModuleParameters#getPlaintextKey()}).
-   * </ul>
-   * <p>
-   * For <b>encryption</b>, this value is generally not required, but is usually set by the
-   * underlying module during encryption. <br>
-   * For <b>decryption</b>, this value is <b>usually required</b>.
-   *
-   * @param encryptedKey
-   *          the encrypted value of the plaintext key
-   */
-  public void setEncryptedKey(byte[] encryptedKey) {
-    this.encryptedKey = encryptedKey;
-  }
-
-  /**
-   * Gets the opaque ID associated with the encrypted version of the plaintext key.
-   *
-   * @see CryptoModuleParameters#setOpaqueKeyEncryptionKeyID(String)
-   */
-  public String getOpaqueKeyEncryptionKeyID() {
-    return opaqueKeyEncryptionKeyID;
-  }
-
-  /**
-   * Sets an opaque ID assocaited with the encrypted version of the plaintext key.
-   * <p>
-   * Often, implementors of the {@link SecretKeyEncryptionStrategy} will need to record some
-   * information about how they encrypted a particular plaintext key. For example, if the strategy
-   * employs several keys for its encryption, it will want to record which key it used. The caller
-   * should not have to worry about the format or contents of this internal ID; thus, the strategy
-   * class will encode whatever information it needs into this string. It is then beholden to the
-   * calling code to record this opqaue string properly to the underlying cryptographically-encoded
-   * stream, and then set the opaque ID back into this parameter object upon reading.
-   * <p>
-   * For <b>encryption</b>, this value is generally not required, but will be typically generated
-   * and set by the {@link SecretKeyEncryptionStrategy} class (see
-   * {@link CryptoModuleParameters#getKeyEncryptionStrategyClass()}). <br>
-   * For <b>decryption</b>, this value is <b>required</b>, though it will typically be read from the
-   * underlying stream.
-   *
-   * @param opaqueKeyEncryptionKeyID
-   *          the opaque ID assoicated with the encrypted version of the plaintext key (see
-   *          {@link CryptoModuleParameters#getEncryptedKey()}).
-   */
-
-  public void setOpaqueKeyEncryptionKeyID(String opaqueKeyEncryptionKeyID) {
-    this.opaqueKeyEncryptionKeyID = opaqueKeyEncryptionKeyID;
-  }
-
-  /**
-   * Gets the flag that indicates whether or not the module should record its cryptographic
-   * parameters to the stream automatically, or rely on the calling code to do so.
-   *
-   * @see CryptoModuleParameters#setRecordParametersToStream(boolean)
-   */
-  public boolean getRecordParametersToStream() {
-    return recordParametersToStream;
-  }
-
-  /**
-   * Gets the flag that indicates whether or not the module should record its cryptographic
-   * parameters to the stream automatically, or rely on the calling code to do so.
-   *
-   * <p>
-   *
-   * If this is set to <i>true</i>, then the stream passed to
-   * {@link CryptoModule#getEncryptingOutputStream(CryptoModuleParameters)} will be <i>written to by
-   * the module</i> before it is returned to the caller. There are situations where it is easier to
-   * let the crypto module do this writing on behalf of the caller, and other times where it is not
-   * appropriate (if the format of the underlying stream must be carefully maintained, for
-   * instance).
-   *
-   * @param recordParametersToStream
-   *          whether or not to require the module to record its parameters to the stream by itself
-   */
-  public void setRecordParametersToStream(boolean recordParametersToStream) {
-    this.recordParametersToStream = recordParametersToStream;
-  }
-
-  /**
-   * Gets the flag that indicates whether or not to close the underlying stream when the cipher
-   * stream is closed.
-   *
-   * @see CryptoModuleParameters#setCloseUnderylingStreamAfterCryptoStreamClose(boolean)
-   */
-  public boolean getCloseUnderylingStreamAfterCryptoStreamClose() {
-    return closeUnderylingStreamAfterCryptoStreamClose;
-  }
-
-  /**
-   * Sets the flag that indicates whether or not to close the underlying stream when the cipher
-   * stream is closed.
-   *
-   * <p>
-   *
-   * {@link CipherOutputStream} will only output its padding bytes when its
-   * {@link CipherOutputStream#close()} method is called. However, there are times when a caller
-   * doesn't want its underlying stream closed at the time that the {@link CipherOutputStream} is
-   * closed. This flag indicates that the {@link CryptoModule} should wrap the underlying stream in
-   * a basic {@link FilterOutputStream} which will swallow any close() calls and prevent them from
-   * propogating to the underlying stream.
-   *
-   * @param closeUnderylingStreamAfterCryptoStreamClose
-   *          the flag that indicates whether or not to close the underlying stream when the cipher
-   *          stream is closed
-   */
-  public void setCloseUnderylingStreamAfterCryptoStreamClose(
-      boolean closeUnderylingStreamAfterCryptoStreamClose) {
-    this.closeUnderylingStreamAfterCryptoStreamClose = closeUnderylingStreamAfterCryptoStreamClose;
-  }
-
-  /**
-   * Gets the flag that indicates if the underlying stream's key encryption strategy should be
-   * overridden by the currently configured key encryption strategy.
-   *
-   * @see CryptoModuleParameters#setOverrideStreamsSecretKeyEncryptionStrategy(boolean)
-   */
-  public boolean getOverrideStreamsSecretKeyEncryptionStrategy() {
-    return overrideStreamsSecretKeyEncryptionStrategy;
-  }
-
-  /**
-   * Sets the flag that indicates if the underlying stream's key encryption strategy should be
-   * overridden by the currently configured key encryption strategy.
-   *
-   * <p>
-   *
-   * So, why is this important? Say you started out with the default secret key encryption strategy.
-   * So, now you have a secret key in HDFS that encrypts all the other secret keys. <i>Then</i> you
-   * deploy a key management solution. You want to move that secret key up to the key management
-   * server. Great! No problem. Except, all your encrypted files now contain a setting that says
-   * "hey I was encrypted by the default strategy, so find decrypt my key using that, not the key
-   * management server". This setting signals the {@link CryptoModule} that it should ignore the
-   * setting in the file and prefer the one from the configuration.
-   *
-   * @param overrideStreamsSecretKeyEncryptionStrategy
-   *          the flag that indicates if the underlying stream's key encryption strategy should be
-   *          overridden by the currently configured key encryption strategy
-   */
-
-  public void setOverrideStreamsSecretKeyEncryptionStrategy(
-      boolean overrideStreamsSecretKeyEncryptionStrategy) {
-    this.overrideStreamsSecretKeyEncryptionStrategy = overrideStreamsSecretKeyEncryptionStrategy;
-  }
-
-  /**
-   * Gets the plaintext output stream to wrap for encryption.
-   *
-   * @see CryptoModuleParameters#setPlaintextOutputStream(OutputStream)
-   */
-  public OutputStream getPlaintextOutputStream() {
-    return plaintextOutputStream;
-  }
-
-  /**
-   * Sets the plaintext output stream to wrap for encryption.
-   *
-   * <p>
-   *
-   * For <b>encryption</b>, this parameter is <b>required</b>. <br>
-   * For <b>decryption</b>, this parameter is ignored.
-   */
-  public void setPlaintextOutputStream(OutputStream plaintextOutputStream) {
-    this.plaintextOutputStream = plaintextOutputStream;
-  }
-
-  /**
-   * Gets the encrypted output stream, which is nearly always a wrapped version of the output stream
-   * from {@link CryptoModuleParameters#getPlaintextOutputStream()}.
-   *
-   * <p>
-   *
-   * Generally this method is used by {@link CryptoModule} classes as an <i>out</i> parameter from
-   * calling {@link CryptoModule#getEncryptingOutputStream(CryptoModuleParameters)}.
-   *
-   * @see CryptoModuleParameters#setEncryptedOutputStream(OutputStream)
-   */
-
-  public OutputStream getEncryptedOutputStream() {
-    return encryptedOutputStream;
-  }
-
-  /**
-   * Sets the encrypted output stream. This method should really only be called by
-   * {@link CryptoModule} implementations unless something very unusual is going on.
-   *
-   * @param encryptedOutputStream
-   *          the encrypted version of the stream from output stream from
-   *          {@link CryptoModuleParameters#getPlaintextOutputStream()}.
-   */
-  public void setEncryptedOutputStream(OutputStream encryptedOutputStream) {
-    this.encryptedOutputStream = encryptedOutputStream;
-  }
-
-  /**
-   * Gets the plaintext input stream, which is nearly always a wrapped version of the output from
-   * {@link CryptoModuleParameters#getEncryptedInputStream()}.
-   *
-   * <p>
-   *
-   * Generally this method is used by {@link CryptoModule} classes as an <i>out</i> parameter from
-   * calling {@link CryptoModule#getDecryptingInputStream(CryptoModuleParameters)}.
-   *
-   *
-   * @see CryptoModuleParameters#setPlaintextInputStream(InputStream)
-   */
-  public InputStream getPlaintextInputStream() {
-    return plaintextInputStream;
-  }
-
-  /**
-   * Sets the plaintext input stream, which is nearly always a wrapped version of the output from
-   * {@link CryptoModuleParameters#getEncryptedInputStream()}.
-   *
-   * <p>
-   *
-   * This method should really only be called by {@link CryptoModule} implementations.
-   */
-
-  public void setPlaintextInputStream(InputStream plaintextInputStream) {
-    this.plaintextInputStream = plaintextInputStream;
-  }
-
-  /**
-   * Gets the encrypted input stream to wrap for decryption.
-   *
-   * @see CryptoModuleParameters#setEncryptedInputStream(InputStream)
-   */
-  public InputStream getEncryptedInputStream() {
-    return encryptedInputStream;
-  }
-
-  /**
-   * Sets the encrypted input stream to wrap for decryption.
-   */
-
-  public void setEncryptedInputStream(InputStream encryptedInputStream) {
-    this.encryptedInputStream = encryptedInputStream;
-  }
-
-  /**
-   * Gets the initialized cipher object.
-   *
-   *
-   * @see CryptoModuleParameters#setCipher(Cipher)
-   */
-  public Cipher getCipher() {
-    return cipher;
-  }
-
-  /**
-   * Sets the initialized cipher object. Generally speaking, callers do not have to create and set
-   * this object. There may be circumstances where the cipher object is created outside of the
-   * module (to determine IV lengths, for one). If it is created and you want the module to use the
-   * cipher you already initialized, set it here.
-   *
-   * @param cipher
-   *          the cipher object
-   */
-  public void setCipher(Cipher cipher) {
-    this.cipher = cipher;
-  }
-
-  /**
-   * Gets the initialized secure random object.
-   *
-   * @see CryptoModuleParameters#setSecureRandom(SecureRandom)
-   */
-  public SecureRandom getSecureRandom() {
-    return secureRandom;
-  }
-
-  /**
-   * Sets the initialized secure random object. Generally speaking, callers do not have to create
-   * and set this object. There may be circumstances where the random object is created outside of
-   * the module (for instance, to create a random secret key). If it is created outside the module
-   * and you want the module to use the random object you already created, set it here.
-   *
-   * @param secureRandom
-   *          the {@link SecureRandom} object
-   */
-
-  public void setSecureRandom(SecureRandom secureRandom) {
-    this.secureRandom = secureRandom;
-  }
-
-  /**
-   * Gets the initialization vector to use for this crypto module.
-   *
-   * @see CryptoModuleParameters#setInitializationVector(byte[])
-   */
-  public byte[] getInitializationVector() {
-    return initializationVector;
-  }
-
-  /**
-   * Sets the initialization vector to use for this crypto module.
-   *
-   * <p>
-   *
-   * For <b>encryption</b>, this parameter is <i>optional</i>. If the initialization vector is
-   * created by the caller, for whatever reasons, it can be set here and the crypto module will use
-   * it. <br>
-   *
-   * For <b>decryption</b>, this parameter is <b>required</b>. It should be read from the underlying
-   * stream that contains the encrypted data.
-   *
-   * @param initializationVector
-   *          the initialization vector to use for this crypto operation.
-   */
-  public void setInitializationVector(byte[] initializationVector) {
-    this.initializationVector = initializationVector;
-  }
-
-  /**
-   * Gets the size of the buffering stream that sits above the cipher stream
-   */
-  public int getBlockStreamSize() {
-    return blockStreamSize;
-  }
-
-  /**
-   * Sets the size of the buffering stream that sits above the cipher stream
-   */
-  public void setBlockStreamSize(int blockStreamSize) {
-    this.blockStreamSize = blockStreamSize;
-  }
-
-  /**
-   * Returns the mode from the cipher suite. Assumes the suite is in the form of
-   * algorithm/mode/padding, returns null if the cipher suite is malformed or NullCipher.
-   *
-   * @return the encryption mode from the cipher suite
-   */
-  public String getCipherSuiteEncryptionMode() {
-    String[] parts = this.cipherSuite.split("/");
-    if (parts.length == 3) {
-      return parts[1];
-    } else {
-      return null;
-    }
-  }
-
-  /**
-   * Updates the initialization vector for use when the encryption mode is GCM. If the IV is not
-   * currently null, and the encryption mode is GCM, it will increment the IV instead of letting the
-   * CryptoModule decide what to do.
-   */
-  public void updateInitializationVector() {
-    if (this.initializationVector != null && getCipherSuiteEncryptionMode()
-        .equals(DefaultCryptoModule.ALGORITHM_PARAMETER_SPEC_GCM)) {
-      incrementIV(this.initializationVector, this.initializationVector.length - 1);
-    } else {
-      this.initializationVector = null;
-    }
-  }
-
-  /**
-   * Because IVs can be longer than longs, this increments arbitrarily sized byte arrays by 1, with
-   * a roll over to 0 after the max value is reached.
-   *
-   * @param iv
-   *          The iv to be incremented
-   * @param i
-   *          The current byte being incremented
-   */
-  static void incrementIV(byte[] iv, int i) {
-    iv[i]++;
-    if (iv[i] == 0) {
-      if (i != 0) {
-        incrementIV(iv, i - 1);
-      } else
-        return;
-    }
-
-  }
-
-  /**
-   * Gets the overall set of options for the {@link CryptoModule}.
-   *
-   * @see CryptoModuleParameters#setAllOptions(Map)
-   */
-  public Map<String,String> getAllOptions() {
-    return allOptions;
-  }
-
-  /**
-   * Sets the overall set of options for the {@link CryptoModule}.
-   *
-   * <p>
-   *
-   * Often, options for the cryptographic modules will be encoded as key/value pairs in a
-   * configuration file. This map represents those values. It may include some of the parameters
-   * already called out as members of this class. It may contain any number of additional parameters
-   * which may be required by different module or key encryption strategy implementations.
-   *
-   * @param allOptions
-   *          the set of key/value pairs that confiure a module, based on a configuration file
-   */
-  public void setAllOptions(Map<String,String> allOptions) {
-    this.allOptions = allOptions;
-  }
-
-  private String cipherSuite = null;
-  private String keyAlgorithmName = null;
-  private byte[] plaintextKey;
-  private int keyLength = 0;
-  private String randomNumberGenerator = null;
-  private String randomNumberGeneratorProvider = null;
-  private String securityProvider = null;
-
-  private String keyEncryptionStrategyClass;
-  private byte[] encryptedKey;
-  private String opaqueKeyEncryptionKeyID;
-
-  private boolean recordParametersToStream = true;
-  private boolean closeUnderylingStreamAfterCryptoStreamClose = true;
-  private boolean overrideStreamsSecretKeyEncryptionStrategy = false;
-
-  private OutputStream plaintextOutputStream;
-  private OutputStream encryptedOutputStream;
-  private InputStream plaintextInputStream;
-  private InputStream encryptedInputStream;
-
-  private Cipher cipher;
-  private SecureRandom secureRandom;
-  private byte[] initializationVector = null;
-
-  private Map<String,String> allOptions;
-  private int blockStreamSize;
-}
diff --git a/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoServiceFactory.java b/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoServiceFactory.java
new file mode 100644
index 0000000..5931bdf
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoServiceFactory.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.accumulo.core.security.crypto;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.spi.crypto.CryptoService;
+import org.apache.accumulo.core.spi.crypto.CryptoService.CryptoException;
+import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
+
+public class CryptoServiceFactory {
+  /**
+   * Load and initialize the CryptoService only once, when this class is loaded.
+   */
+  private static CryptoService singleton = init();
+
+  private static CryptoService init() {
+    SiteConfiguration conf = SiteConfiguration.getInstance();
+    String configuredClass = conf.get(Property.INSTANCE_CRYPTO_SERVICE.getKey());
+    CryptoService newCryptoService = loadCryptoService(configuredClass);
+    newCryptoService.init(conf.getAllPropertiesWithPrefix(Property.INSTANCE_CRYPTO_PREFIX));
+    return newCryptoService;
+  }
+
+  /**
+   * Get the class configured in {@link Property#INSTANCE_CRYPTO_SERVICE}. This class should have
+   * been loaded and initialized when CryptoServiceFactory is loaded.
+   *
+   * @throws CryptoException
+   *           if class configured differs from the original class loaded
+   */
+  public static CryptoService getConfigured(AccumuloConfiguration conf) {
+    String currentClass = singleton.getClass().getName();
+    String configuredClass = conf.get(Property.INSTANCE_CRYPTO_SERVICE.getKey());
+    if (!currentClass.equals(configuredClass)) {
+      String msg = String.format("Configured crypto class %s changed since initialization of %s.",
+          configuredClass, currentClass);
+      throw new CryptoService.CryptoException(msg);
+    }
+    return singleton;
+  }
+
+  private static CryptoService loadCryptoService(String className) {
+    try {
+      Class<? extends CryptoService> clazz = AccumuloVFSClassLoader.loadClass(className,
+          CryptoService.class);
+      return clazz.newInstance();
+    } catch (Exception e) {
+      throw new CryptoException(e);
+    }
+  }
+
+  /**
+   * This method is only for testing. Do not use.
+   */
+  public static void resetInstance() {
+    singleton = init();
+  }
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModuleUtils.java b/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoUtils.java
similarity index 70%
rename from core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModuleUtils.java
rename to core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoUtils.java
index 079c579..e745f0e 100644
--- a/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModuleUtils.java
+++ b/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoUtils.java
@@ -16,20 +16,30 @@
  */
 package org.apache.accumulo.core.security.crypto;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.security.NoSuchAlgorithmException;
 import java.security.NoSuchProviderException;
 import java.security.SecureRandom;
+import java.util.Objects;
 
 import javax.crypto.Cipher;
 import javax.crypto.NoSuchPaddingException;
 import javax.crypto.NullCipher;
 
+import org.apache.accumulo.core.spi.crypto.CryptoService.CryptoException;
+import org.apache.commons.io.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class DefaultCryptoModuleUtils {
+public class CryptoUtils {
 
-  private static final Logger log = LoggerFactory.getLogger(DefaultCryptoModuleUtils.class);
+  private static final Logger log = LoggerFactory.getLogger(CryptoUtils.class);
+
+  public static SecureRandom getSha1SecureRandom() {
+    return getSecureRandom("SHA1PRNG", "SUN");
+  }
 
   public static SecureRandom getSecureRandom(String secureRNG, String secureRNGProvider) {
     SecureRandom secureRandom = null;
@@ -43,11 +53,11 @@ public class DefaultCryptoModuleUtils {
     } catch (NoSuchAlgorithmException e) {
       log.error(String.format("Accumulo configuration file specified a secure"
           + " random generator \"%s\" that was not found by any provider.", secureRNG));
-      throw new RuntimeException(e);
+      throw new CryptoException(e);
     } catch (NoSuchProviderException e) {
       log.error(String.format("Accumulo configuration file specified a secure"
           + " random provider \"%s\" that does not exist", secureRNGProvider));
-      throw new RuntimeException(e);
+      throw new CryptoException(e);
     }
     return secureRandom;
   }
@@ -67,21 +77,42 @@ public class DefaultCryptoModuleUtils {
       } catch (NoSuchAlgorithmException e) {
         log.error(String.format("Accumulo configuration file contained a cipher"
             + " suite \"%s\" that was not recognized by any providers", cipherSuite));
-        throw new RuntimeException(e);
+        throw new CryptoException(e);
       } catch (NoSuchPaddingException e) {
         log.error(String.format(
             "Accumulo configuration file contained a"
                 + " cipher, \"%s\" with a padding that was not recognized by any" + " providers",
             cipherSuite));
-        throw new RuntimeException(e);
+        throw new CryptoException(e);
       } catch (NoSuchProviderException e) {
         log.error(String.format(
             "Accumulo configuration file contained a provider, \"%s\" an unrecognized provider",
             securityProvider));
-        throw new RuntimeException(e);
+        throw new CryptoException(e);
       }
     }
     return cipher;
   }
 
+  /**
+   * Read the decryption parameters from the DataInputStream
+   */
+  public static byte[] readParams(DataInputStream in) throws IOException {
+    Objects.requireNonNull(in);
+    int len = in.readInt();
+    byte[] decryptionParams = new byte[len];
+    IOUtils.readFully(in, decryptionParams);
+    return decryptionParams;
+  }
+
+  /**
+   * Write the decryption parameters to the DataOutputStream
+   */
+  public static void writeParams(byte[] decryptionParams, DataOutputStream out) throws IOException {
+    Objects.requireNonNull(decryptionParams);
+    Objects.requireNonNull(out);
+    out.writeInt(decryptionParams.length);
+    out.write(decryptionParams);
+  }
+
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModule.java b/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModule.java
deleted file mode 100644
index d209307..0000000
--- a/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModule.java
+++ /dev/null
@@ -1,488 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.security.crypto;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.PushbackInputStream;
-import java.security.InvalidAlgorithmParameterException;
-import java.security.InvalidKeyException;
-import java.security.SecureRandom;
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.crypto.Cipher;
-import javax.crypto.CipherInputStream;
-import javax.crypto.spec.GCMParameterSpec;
-import javax.crypto.spec.IvParameterSpec;
-import javax.crypto.spec.SecretKeySpec;
-
-import org.apache.accumulo.core.conf.Property;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class implements the {@link CryptoModule} interface, defining how calling applications can
- * receive encrypted input and output streams. While the default implementation given here allows
- * for a lot of flexibility in terms of choices of algorithm, key encryption strategies, and so on,
- * some Accumulo users may choose to swap out this implementation for others, and can base their
- * implementation details off of this class's work.
- *
- * In general, the module is quite straightforward: provide it with crypto-related settings and an
- * input/output stream, and it will hand back those streams wrapped in encrypting (or decrypting)
- * streams.
- */
-public class DefaultCryptoModule implements CryptoModule {
-
-  private static final String ENCRYPTION_HEADER_MARKER_V1 = "---Log File Encrypted (v1)---";
-  private static final String ENCRYPTION_HEADER_MARKER_V2 = "---Log File Encrypted (v2)---";
-  public static final String ALGORITHM_PARAMETER_SPEC_GCM = "GCM";
-
-  // 128-bit tags are the longest available for GCM
-  private static final Integer GCM_TAG_LENGTH_IN_BYTES = 16;
-
-  /*
-   * According to NIST Special Publication 800-38D, Section 5.2.1.1: "For IVs, it is recommended
-   * that implementations restrict support to the length of 96 bits, to promote interoperability,
-   * efficiency, and simplicity of design"
-   */
-  private static final Integer GCM_IV_LENGTH_IN_BYTES = 12;
-
-  private static final Logger log = LoggerFactory.getLogger(DefaultCryptoModule.class);
-
-  @Override
-  public CryptoModuleParameters initializeCipher(CryptoModuleParameters params) {
-
-    log.trace(String.format(
-        "Using cipher suite \"%s\" with key length %d with"
-            + " RNG \"%s\" and RNG provider \"%s\" and key encryption strategy" + " \"%s\"",
-        params.getCipherSuite(), params.getKeyLength(), params.getRandomNumberGenerator(),
-        params.getRandomNumberGeneratorProvider(), params.getKeyEncryptionStrategyClass()));
-
-    if (params.getSecureRandom() == null) {
-      SecureRandom secureRandom = DefaultCryptoModuleUtils.getSecureRandom(
-          params.getRandomNumberGenerator(), params.getRandomNumberGeneratorProvider());
-      params.setSecureRandom(secureRandom);
-    }
-
-    Cipher cipher = DefaultCryptoModuleUtils.getCipher(params.getCipherSuite(),
-        params.getSecurityProvider());
-
-    if (params.getInitializationVector() == null) {
-      if (params.getCipherSuiteEncryptionMode().equals(ALGORITHM_PARAMETER_SPEC_GCM)) {
-        byte[] gcmIV = new byte[GCM_IV_LENGTH_IN_BYTES];
-        params.getSecureRandom().nextBytes(gcmIV);
-        params.setInitializationVector(gcmIV);
-      }
-    }
-
-    try {
-      initCipher(params, cipher, Cipher.ENCRYPT_MODE);
-    } catch (InvalidKeyException e) {
-      log.error("Accumulo encountered an unknown error in generating the secret"
-          + " key object (SecretKeySpec) for an encrypted stream");
-      throw new RuntimeException(e);
-    } catch (InvalidAlgorithmParameterException e) {
-      log.error("Accumulo encountered an unknown error in setting up the"
-          + " initialization vector for an encrypted stream");
-      throw new RuntimeException(e);
-    }
-
-    params.setCipher(cipher);
-
-    return params;
-
-  }
-
-  private boolean validateNotEmpty(String givenValue, boolean allIsWell, StringBuilder buf,
-      String errorMessage) {
-    if (givenValue == null || givenValue.equals("")) {
-      buf.append(errorMessage);
-      buf.append("\n");
-      return false;
-    }
-
-    return allIsWell;
-  }
-
-  private boolean validateNotNull(Object givenValue, boolean allIsWell, StringBuilder buf,
-      String errorMessage) {
-    if (givenValue == null) {
-      buf.append(errorMessage);
-      buf.append("\n");
-      return false;
-    }
-
-    return allIsWell;
-  }
-
-  private boolean validateNotZero(int givenValue, boolean allIsWell, StringBuilder buf,
-      String errorMessage) {
-    if (givenValue == 0) {
-      buf.append(errorMessage);
-      buf.append("\n");
-      return false;
-    }
-
-    return allIsWell;
-  }
-
-  private boolean validateParamsObject(CryptoModuleParameters params, int cipherMode) {
-
-    if (cipherMode == Cipher.ENCRYPT_MODE) {
-
-      StringBuilder errorBuf = new StringBuilder("The following problems were"
-          + " found with the CryptoModuleParameters object you provided for an"
-          + " encrypt operation:\n");
-      boolean allIsWell = true;
-
-      allIsWell = validateNotEmpty(params.getCipherSuite(), allIsWell, errorBuf,
-          "No cipher suite was specified.");
-
-      if (allIsWell && params.getCipherSuite().equals("NullCipher")) {
-        return true;
-      }
-
-      allIsWell = validateNotZero(params.getKeyLength(), allIsWell, errorBuf,
-          "No key length was specified.");
-      allIsWell = validateNotEmpty(params.getKeyAlgorithmName(), allIsWell, errorBuf,
-          "No key algorithm name was specified.");
-      allIsWell = validateNotEmpty(params.getRandomNumberGenerator(), allIsWell, errorBuf,
-          "No random number generator was specified.");
-      allIsWell = validateNotEmpty(params.getRandomNumberGeneratorProvider(), allIsWell, errorBuf,
-          "No random number generate provider was specified.");
-      allIsWell = validateNotNull(params.getPlaintextOutputStream(), allIsWell, errorBuf,
-          "No plaintext output stream was specified.");
-
-      if (!allIsWell) {
-        log.error("CryptoModulesParameters object is not valid.");
-        log.error(errorBuf.toString());
-        throw new RuntimeException("CryptoModulesParameters object is not valid.");
-      }
-
-      return allIsWell;
-
-    } else if (cipherMode == Cipher.DECRYPT_MODE) {
-      StringBuilder errorBuf = new StringBuilder("The following problems were"
-          + " found with the CryptoModuleParameters object you provided for a"
-          + " decrypt operation:\n");
-      boolean allIsWell = true;
-
-      allIsWell = validateNotZero(params.getKeyLength(), allIsWell, errorBuf,
-          "No key length was specified.");
-      allIsWell = validateNotEmpty(params.getRandomNumberGenerator(), allIsWell, errorBuf,
-          "No random number generator was specified.");
-      allIsWell = validateNotEmpty(params.getRandomNumberGeneratorProvider(), allIsWell, errorBuf,
-          "No random number generate provider was specified.");
-      allIsWell = validateNotNull(params.getEncryptedInputStream(), allIsWell, errorBuf,
-          "No encrypted input stream was specified.");
-      allIsWell = validateNotNull(params.getInitializationVector(), allIsWell, errorBuf,
-          "No initialization vector was specified.");
-      allIsWell = validateNotNull(params.getEncryptedKey(), allIsWell, errorBuf,
-          "No encrypted key was specified.");
-
-      if (params.getKeyEncryptionStrategyClass() != null
-          && !params.getKeyEncryptionStrategyClass().equals("NullSecretKeyEncryptionStrategy")) {
-        allIsWell = validateNotEmpty(params.getOpaqueKeyEncryptionKeyID(), allIsWell, errorBuf,
-            "No opqaue key encryption ID was specified.");
-      }
-
-      if (!allIsWell) {
-        log.error("CryptoModulesParameters object is not valid.");
-        log.error(errorBuf.toString());
-        throw new RuntimeException("CryptoModulesParameters object is not valid.");
-      }
-
-      return allIsWell;
-
-    }
-
-    return false;
-  }
-
-  @Override
-  public CryptoModuleParameters getEncryptingOutputStream(CryptoModuleParameters params)
-      throws IOException {
-
-    log.trace("Initializing crypto output stream (new style)");
-
-    boolean allParamsOK = validateParamsObject(params, Cipher.ENCRYPT_MODE);
-    if (!allParamsOK) {
-      // This would be weird because the above call should throw an exception, but if they don't
-      // we'll check and throw.
-
-      log.error("CryptoModuleParameters was not valid.");
-      throw new RuntimeException("Invalid CryptoModuleParameters");
-    }
-
-    // If they want a null output stream, just return their plaintext stream as the encrypted stream
-    if (params.getCipherSuite().equals("NullCipher")) {
-      params.setEncryptedOutputStream(params.getPlaintextOutputStream());
-      return params;
-    }
-
-    // Get the secret key
-    if (params.getPlaintextKey() == null) {
-      params = generateNewRandomSessionKey(params);
-    }
-
-    // Encrypt the secret key
-
-    SecretKeyEncryptionStrategy keyEncryptionStrategy = CryptoModuleFactory
-        .getSecretKeyEncryptionStrategy(params.getKeyEncryptionStrategyClass());
-    params = keyEncryptionStrategy.encryptSecretKey(params);
-
-    // Now the encrypted version of the key and any opaque ID are within the params object.
-    // Initialize the cipher.
-
-    // Check if the caller wants us to close the downstream stream when close() is called on the
-    // cipher object. Calling close() on a CipherOutputStream is necessary for it to write out
-    // padding bytes.
-    if (!params.getCloseUnderylingStreamAfterCryptoStreamClose()) {
-      params.setPlaintextOutputStream(
-          new DiscardCloseOutputStream(params.getPlaintextOutputStream()));
-    }
-
-    Cipher cipher = params.getCipher();
-    if (cipher == null) {
-      initializeCipher(params);
-      cipher = params.getCipher();
-    }
-
-    if (0 == cipher.getBlockSize()) {
-      throw new RuntimeException("Encryption cipher must be a block cipher");
-    }
-
-    RFileCipherOutputStream cipherOutputStream = new RFileCipherOutputStream(
-        params.getPlaintextOutputStream(), cipher);
-    BlockedOutputStream blockedOutputStream = new BlockedOutputStream(cipherOutputStream,
-        cipher.getBlockSize(), params.getBlockStreamSize());
-
-    params.setEncryptedOutputStream(blockedOutputStream);
-
-    if (params.getRecordParametersToStream()) {
-      DataOutputStream dataOut = new DataOutputStream(params.getPlaintextOutputStream());
-
-      // Write a marker to indicate this is an encrypted log file (in case we read it a plain one
-      // and need to
-      // not try to decrypt it. Can happen during a failure when the log's encryption settings are
-      // changing.
-      dataOut.writeUTF(ENCRYPTION_HEADER_MARKER_V2);
-
-      // Write out all the parameters
-      dataOut.writeInt(params.getAllOptions().size());
-      for (String key : params.getAllOptions().keySet()) {
-        dataOut.writeUTF(key);
-        dataOut.writeUTF(params.getAllOptions().get(key));
-      }
-
-      // Write out the cipher suite and algorithm used to encrypt this file. In case the admin
-      // changes, we want to still
-      // decode the old format.
-      dataOut.writeUTF(params.getCipherSuite());
-      dataOut.writeUTF(params.getKeyAlgorithmName());
-
-      // Write the init vector to the log file
-      dataOut.writeInt(params.getInitializationVector().length);
-      dataOut.write(params.getInitializationVector());
-
-      // Write out the encrypted session key and the opaque ID
-      dataOut.writeUTF(params.getOpaqueKeyEncryptionKeyID());
-      dataOut.writeInt(params.getEncryptedKey().length);
-      dataOut.write(params.getEncryptedKey());
-      dataOut.writeInt(params.getBlockStreamSize());
-    }
-
-    return params;
-  }
-
-  @Override
-  public CryptoModuleParameters getDecryptingInputStream(CryptoModuleParameters params)
-      throws IOException {
-    log.trace("About to initialize decryption stream (new style)");
-
-    if (params.getRecordParametersToStream()) {
-      DataInputStream dataIn = new DataInputStream(params.getEncryptedInputStream());
-      log.trace("About to read encryption parameters from underlying stream");
-
-      String marker = dataIn.readUTF();
-      if (marker.equals(ENCRYPTION_HEADER_MARKER_V1)
-          || marker.equals(ENCRYPTION_HEADER_MARKER_V2)) {
-
-        Map<String,String> paramsFromFile = new HashMap<>();
-
-        // Read in the bulk of parameters
-        int paramsCount = dataIn.readInt();
-        for (int i = 0; i < paramsCount; i++) {
-          String key = dataIn.readUTF();
-          String value = dataIn.readUTF();
-
-          paramsFromFile.put(key, value);
-        }
-
-        // Set the cipher parameters
-        String cipherSuiteFromFile = dataIn.readUTF();
-        String algorithmNameFromFile = dataIn.readUTF();
-        params.setCipherSuite(cipherSuiteFromFile);
-        params.setKeyAlgorithmName(algorithmNameFromFile);
-
-        // Read the secret key and initialization vector from the file
-        int initVectorLength = dataIn.readInt();
-        byte[] initVector = new byte[initVectorLength];
-        dataIn.readFully(initVector);
-
-        params.setInitializationVector(initVector);
-
-        // Read the opaque ID and encrypted session key
-        String opaqueId = dataIn.readUTF();
-        params.setOpaqueKeyEncryptionKeyID(opaqueId);
-
-        int encryptedSecretKeyLength = dataIn.readInt();
-        byte[] encryptedSecretKey = new byte[encryptedSecretKeyLength];
-        dataIn.readFully(encryptedSecretKey);
-        params.setEncryptedKey(encryptedSecretKey);
-
-        if (params.getOverrideStreamsSecretKeyEncryptionStrategy()) {
-          // Merge in options from file selectively
-          for (String name : paramsFromFile.keySet()) {
-            if (!name.equals(Property.CRYPTO_SECRET_KEY_ENCRYPTION_STRATEGY_CLASS.getKey())) {
-              params.getAllOptions().put(name, paramsFromFile.get(name));
-            }
-          }
-          params.setKeyEncryptionStrategyClass(params.getAllOptions()
-              .get(Property.CRYPTO_SECRET_KEY_ENCRYPTION_STRATEGY_CLASS.getKey()));
-        } else {
-          params = CryptoModuleFactory.fillParamsObjectFromStringMap(params, paramsFromFile);
-        }
-
-        SecretKeyEncryptionStrategy keyEncryptionStrategy = CryptoModuleFactory
-            .getSecretKeyEncryptionStrategy(params.getKeyEncryptionStrategyClass());
-
-        params = keyEncryptionStrategy.decryptSecretKey(params);
-
-        if (marker.equals(ENCRYPTION_HEADER_MARKER_V2))
-          params.setBlockStreamSize(dataIn.readInt());
-        else
-          params.setBlockStreamSize(0);
-      } else {
-
-        log.trace("Read something off of the encrypted input stream that was"
-            + " not the encryption header marker, so pushing back bytes and"
-            + " returning the given stream");
-        // Push these bytes back on to the stream. This method is a bit roundabout but isolates our
-        // code from having to understand the format that DataOuputStream uses for its bytes.
-        ByteArrayOutputStream tempByteOut = new ByteArrayOutputStream();
-        DataOutputStream tempOut = new DataOutputStream(tempByteOut);
-        tempOut.writeUTF(marker);
-
-        byte[] bytesToPutBack = tempByteOut.toByteArray();
-
-        PushbackInputStream pushbackStream = new PushbackInputStream(
-            params.getEncryptedInputStream(), bytesToPutBack.length);
-        pushbackStream.unread(bytesToPutBack);
-
-        params.setPlaintextInputStream(pushbackStream);
-
-        return params;
-      }
-    }
-
-    // We validate here after reading parameters from the stream, not at the top of the function.
-    boolean allParamsOK = validateParamsObject(params, Cipher.DECRYPT_MODE);
-
-    if (!allParamsOK) {
-      log.error("CryptoModuleParameters object failed validation for decrypt");
-      throw new RuntimeException("CryptoModuleParameters object failed validation for decrypt");
-    }
-
-    Cipher cipher = DefaultCryptoModuleUtils.getCipher(params.getCipherSuite(),
-        params.getSecurityProvider());
-
-    try {
-      initCipher(params, cipher, Cipher.DECRYPT_MODE);
-    } catch (InvalidKeyException e) {
-      log.error("Error when trying to initialize cipher with secret key");
-      throw new RuntimeException(e);
-    } catch (InvalidAlgorithmParameterException e) {
-      log.error("Error when trying to initialize cipher with initialization vector");
-      throw new RuntimeException(e);
-    }
-
-    InputStream blockedDecryptingInputStream = new CipherInputStream(
-        params.getEncryptedInputStream(), cipher);
-
-    if (params.getBlockStreamSize() > 0)
-      blockedDecryptingInputStream = new BlockedInputStream(blockedDecryptingInputStream,
-          cipher.getBlockSize(), params.getBlockStreamSize());
-
-    log.trace("Initialized cipher input stream with transformation [{}]", params.getCipherSuite());
-
-    params.setPlaintextInputStream(blockedDecryptingInputStream);
-
-    return params;
-  }
-
-  /**
-   *
-   * @param params
-   *          the crypto parameters
-   * @param cipher
-   *          the Java Cipher object to be init'd
-   * @param opMode
-   *          encrypt or decrypt
-   * @throws InvalidKeyException
-   *           if the crypto params are missing necessary values
-   * @throws InvalidAlgorithmParameterException
-   *           if an invalid algorithm is chosen
-   */
-  private void initCipher(CryptoModuleParameters params, Cipher cipher, int opMode)
-      throws InvalidKeyException, InvalidAlgorithmParameterException {
-    if (params.getCipherSuiteEncryptionMode().equals(ALGORITHM_PARAMETER_SPEC_GCM)) {
-      cipher.init(opMode, new SecretKeySpec(params.getPlaintextKey(), params.getKeyAlgorithmName()),
-          new GCMParameterSpec(GCM_TAG_LENGTH_IN_BYTES * 8, params.getInitializationVector()));
-    } else {
-      if (params.getInitializationVector() == null) {
-        cipher.init(opMode,
-            new SecretKeySpec(params.getPlaintextKey(), params.getKeyAlgorithmName()),
-            params.getSecureRandom());
-        params.setInitializationVector(cipher.getIV());
-      } else {
-        cipher.init(opMode,
-            new SecretKeySpec(params.getPlaintextKey(), params.getKeyAlgorithmName()),
-            new IvParameterSpec(params.getInitializationVector()));
-      }
-    }
-  }
-
-  @Override
-  public CryptoModuleParameters generateNewRandomSessionKey(CryptoModuleParameters params) {
-
-    if (params.getSecureRandom() == null) {
-      params.setSecureRandom(DefaultCryptoModuleUtils.getSecureRandom(
-          params.getRandomNumberGenerator(), params.getRandomNumberGeneratorProvider()));
-    }
-    byte[] newSessionKey = new byte[params.getKeyLength() / 8];
-
-    params.getSecureRandom().nextBytes(newSessionKey);
-    params.setPlaintextKey(newSessionKey);
-
-    return params;
-  }
-}
diff --git a/core/src/main/java/org/apache/accumulo/core/security/crypto/NonCachingSecretKeyEncryptionStrategy.java b/core/src/main/java/org/apache/accumulo/core/security/crypto/NonCachingSecretKeyEncryptionStrategy.java
deleted file mode 100644
index 7e2cef2..0000000
--- a/core/src/main/java/org/apache/accumulo/core/security/crypto/NonCachingSecretKeyEncryptionStrategy.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.accumulo.core.security.crypto;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.security.InvalidKeyException;
-import java.security.Key;
-import java.security.NoSuchAlgorithmException;
-import java.security.SecureRandom;
-
-import javax.crypto.Cipher;
-import javax.crypto.IllegalBlockSizeException;
-import javax.crypto.spec.SecretKeySpec;
-
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-//TODO ACCUMULO-2530 Update properties to use a URI instead of a relative path to secret key
-public class NonCachingSecretKeyEncryptionStrategy implements SecretKeyEncryptionStrategy {
-
-  private static final Logger log = LoggerFactory
-      .getLogger(NonCachingSecretKeyEncryptionStrategy.class);
-
-  private void doKeyEncryptionOperation(int encryptionMode, CryptoModuleParameters params,
-      String pathToKeyName, Path pathToKey, FileSystem fs) throws IOException {
-    DataInputStream in = null;
-    try {
-      if (!fs.exists(pathToKey)) {
-
-        if (encryptionMode == Cipher.UNWRAP_MODE) {
-          log.error("There was a call to decrypt the session key but no key"
-              + " encryption key exists. Either restore it, reconfigure the conf"
-              + " file to point to it in HDFS, or throw the affected data away and"
-              + " begin again.");
-          throw new RuntimeException(
-              "Could not find key encryption key file in configured location in HDFS ("
-                  + pathToKeyName + ")");
-        } else {
-          DataOutputStream out = null;
-          try {
-            out = fs.create(pathToKey);
-            // Very important, lets hedge our bets
-            fs.setReplication(pathToKey, (short) 5);
-            SecureRandom random = DefaultCryptoModuleUtils.getSecureRandom(
-                params.getRandomNumberGenerator(), params.getRandomNumberGeneratorProvider());
-            int keyLength = params.getKeyLength();
-            byte[] newRandomKeyEncryptionKey = new byte[keyLength / 8];
-            random.nextBytes(newRandomKeyEncryptionKey);
-            out.writeInt(newRandomKeyEncryptionKey.length);
-            out.write(newRandomKeyEncryptionKey);
-            out.flush();
-          } finally {
-            if (out != null) {
-              out.close();
-            }
-          }
-
-        }
-      }
-      in = fs.open(pathToKey);
-
-      int keyEncryptionKeyLength = in.readInt();
-      byte[] keyEncryptionKey = new byte[keyEncryptionKeyLength];
-      int bytesRead = in.read(keyEncryptionKey);
-
-      Cipher cipher = DefaultCryptoModuleUtils.getCipher(
-          params.getAllOptions().get(Property.CRYPTO_DEFAULT_KEY_STRATEGY_CIPHER_SUITE.getKey()),
-          params.getSecurityProvider());
-
-      // check if the number of bytes read into the array is the same as the value of the length
-      // field,
-      if (bytesRead == keyEncryptionKeyLength) {
-        try {
-          cipher.init(encryptionMode,
-              new SecretKeySpec(keyEncryptionKey, params.getKeyAlgorithmName()));
-        } catch (InvalidKeyException e) {
-          log.error("{}", e.getMessage(), e);
-          throw new RuntimeException(e);
-        }
-
-        if (Cipher.UNWRAP_MODE == encryptionMode) {
-          try {
-            Key plaintextKey = cipher.unwrap(params.getEncryptedKey(), params.getKeyAlgorithmName(),
-                Cipher.SECRET_KEY);
-            params.setPlaintextKey(plaintextKey.getEncoded());
-          } catch (InvalidKeyException | NoSuchAlgorithmException e) {
-            log.error("{}", e.getMessage(), e);
-            throw new RuntimeException(e);
-          }
-        } else {
-          Key plaintextKey = new SecretKeySpec(params.getPlaintextKey(),
-              params.getKeyAlgorithmName());
-          try {
-            byte[] encryptedSecretKey = cipher.wrap(plaintextKey);
-            params.setEncryptedKey(encryptedSecretKey);
-            params.setOpaqueKeyEncryptionKeyID(pathToKeyName);
-          } catch (InvalidKeyException | IllegalBlockSizeException e) {
-            log.error("{}", e.getMessage(), e);
-            throw new RuntimeException(e);
-          }
-
-        }
-      } else {
-        log.error("{}", "Error:bytesRead does not match EncryptionkeyLength");
-        throw new IllegalArgumentException("Error:bytesRead does not match EncryptionkeyLength");
-      }
-    } finally {
-      if (in != null) {
-        in.close();
-      }
-    }
-  }
-
-  @SuppressWarnings("deprecation")
-  private String getFullPathToKey(CryptoModuleParameters params) {
-    String pathToKeyName = params.getAllOptions()
-        .get(Property.CRYPTO_DEFAULT_KEY_STRATEGY_KEY_LOCATION.getKey());
-    String instanceDirectory = params.getAllOptions().get(Property.INSTANCE_DFS_DIR.getKey());
-
-    if (pathToKeyName == null) {
-      pathToKeyName = Property.CRYPTO_DEFAULT_KEY_STRATEGY_KEY_LOCATION.getDefaultValue();
-    }
-
-    if (instanceDirectory == null) {
-      instanceDirectory = Property.INSTANCE_DFS_DIR.getDefaultValue();
-    }
-
-    if (!pathToKeyName.startsWith("/")) {
-      pathToKeyName = "/" + pathToKeyName;
-    }
-
-    return instanceDirectory + pathToKeyName;
-  }
-
-  @SuppressWarnings("deprecation")
-  @Override
-  public CryptoModuleParameters encryptSecretKey(CryptoModuleParameters params) {
-    String hdfsURI = params.getAllOptions().get(Property.INSTANCE_DFS_URI.getKey());
-    if (hdfsURI == null) {
-      hdfsURI = Property.INSTANCE_DFS_URI.getDefaultValue();
-    }
-
-    String fullPath = getFullPathToKey(params);
-    Path pathToKey = new Path(fullPath);
-
-    try {
-      // TODO ACCUMULO-2530 Ensure volumes a properly supported
-      FileSystem fs = FileSystem.get(CachedConfiguration.getInstance());
-      doKeyEncryptionOperation(Cipher.WRAP_MODE, params, fullPath, pathToKey, fs);
-
-    } catch (IOException e) {
-      log.error("{}", e.getMessage(), e);
-      throw new RuntimeException(e);
-    }
-
-    return params;
-  }
-
-  @SuppressWarnings("deprecation")
-  @Override
-  public CryptoModuleParameters decryptSecretKey(CryptoModuleParameters params) {
-    String hdfsURI = params.getAllOptions().get(Property.INSTANCE_DFS_URI.getKey());
-    if (hdfsURI == null) {
-      hdfsURI = Property.INSTANCE_DFS_URI.getDefaultValue();
-    }
-
-    String pathToKeyName = getFullPathToKey(params);
-    Path pathToKey = new Path(pathToKeyName);
-
-    try {
-      // TODO ACCUMULO-2530 Ensure volumes a properly supported
-      FileSystem fs = FileSystem.get(CachedConfiguration.getInstance());
-      doKeyEncryptionOperation(Cipher.UNWRAP_MODE, params, pathToKeyName, pathToKey, fs);
-
-    } catch (IOException e) {
-      log.error("{}", e.getMessage(), e);
-      throw new RuntimeException(e);
-    }
-
-    return params;
-  }
-
-}
diff --git a/core/src/main/java/org/apache/accumulo/core/security/crypto/SecretKeyEncryptionStrategy.java b/core/src/main/java/org/apache/accumulo/core/security/crypto/SecretKeyEncryptionStrategy.java
deleted file mode 100644
index 547c47f..0000000
--- a/core/src/main/java/org/apache/accumulo/core/security/crypto/SecretKeyEncryptionStrategy.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.security.crypto;
-
-import java.io.IOException;
-
-public interface SecretKeyEncryptionStrategy {
-
-  CryptoModuleParameters encryptSecretKey(CryptoModuleParameters params) throws IOException;
-
-  CryptoModuleParameters decryptSecretKey(CryptoModuleParameters params);
-
-}
diff --git a/core/src/main/java/org/apache/accumulo/core/security/crypto/impl/AESCryptoService.java b/core/src/main/java/org/apache/accumulo/core/security/crypto/impl/AESCryptoService.java
new file mode 100644
index 0000000..e9d16cf
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/security/crypto/impl/AESCryptoService.java
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.accumulo.core.security.crypto.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+import java.security.InvalidAlgorithmParameterException;
+import java.security.InvalidKeyException;
+import java.security.Key;
+import java.security.NoSuchAlgorithmException;
+import java.security.NoSuchProviderException;
+import java.security.SecureRandom;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import javax.crypto.Cipher;
+import javax.crypto.CipherInputStream;
+import javax.crypto.CipherOutputStream;
+import javax.crypto.NoSuchPaddingException;
+import javax.crypto.spec.GCMParameterSpec;
+import javax.crypto.spec.IvParameterSpec;
+
+import org.apache.accumulo.core.security.crypto.CryptoUtils;
+import org.apache.accumulo.core.security.crypto.streams.BlockedInputStream;
+import org.apache.accumulo.core.security.crypto.streams.BlockedOutputStream;
+import org.apache.accumulo.core.security.crypto.streams.DiscardCloseOutputStream;
+import org.apache.accumulo.core.security.crypto.streams.RFileCipherOutputStream;
+import org.apache.accumulo.core.spi.crypto.CryptoEnvironment;
+import org.apache.accumulo.core.spi.crypto.CryptoService;
+import org.apache.accumulo.core.spi.crypto.FileDecrypter;
+import org.apache.accumulo.core.spi.crypto.FileEncrypter;
+import org.apache.commons.io.IOUtils;
+
+/**
+ * Example implementation of AES encryption for Accumulo
+ */
+public class AESCryptoService implements CryptoService {
+
+  // Hard coded NoCryptoService.VERSION - this permits the removal of NoCryptoService from the
+  // core jar, allowing use of only one crypto service
+  private static final String NO_CRYPTO_VERSION = "U+1F47B";
+
+  private Key encryptingKek = null;
+  private String encryptingKekId = null;
+  private String encryptingKeyManager = null;
+  // Lets just load keks for reading once
+  private static HashMap<String,Key> decryptingKeys = new HashMap<>();
+
+  @Override
+  public void init(Map<String,String> conf) throws CryptoException {
+    String kekId = conf.get("instance.crypto.opts.kekId");
+    String keyMgr = conf.get("instance.crypto.opts.keyManager");
+    Objects.requireNonNull(kekId, "Config property instance.crypto.opts.kekId is required.");
+    Objects.requireNonNull(keyMgr, "Config property instance.crypto.opts.keyManager is required.");
+    switch (keyMgr) {
+      case KeyManager.URI:
+        this.encryptingKeyManager = keyMgr;
+        this.encryptingKekId = kekId;
+        this.encryptingKek = KeyManager.loadKekFromUri(kekId);
+        break;
+      default:
+        throw new CryptoException("Unrecognized key manager");
+    }
+
+  }
+
+  @Override
+  public FileEncrypter getFileEncrypter(CryptoEnvironment environment) {
+    CryptoModule cm;
+    switch (environment.getScope()) {
+      case WAL:
+        cm = new AESCBCCryptoModule(this.encryptingKek, this.encryptingKekId,
+            this.encryptingKeyManager);
+        return cm.getEncrypter();
+
+      case RFILE:
+        cm = new AESGCMCryptoModule(this.encryptingKek, this.encryptingKekId,
+            this.encryptingKeyManager);
+        return cm.getEncrypter();
+
+      default:
+        throw new CryptoException("Unknown scope: " + environment.getScope());
+    }
+  }
+
+  @Override
+  public FileDecrypter getFileDecrypter(CryptoEnvironment environment) {
+    CryptoModule cm;
+    byte[] decryptionParams = environment.getDecryptionParams();
+    if (decryptionParams == null || checkNoCrypto(decryptionParams))
+      return new NoFileDecrypter();
+
+    ParsedCryptoParameters parsed = parseCryptoParameters(decryptionParams);
+    Key kek = loadDecryptionKek(parsed);
+    Key fek = KeyManager.unwrapKey(parsed.getEncFek(), kek);
+    switch (parsed.getCryptoServiceVersion()) {
+      case AESCBCCryptoModule.VERSION:
+        cm = new AESCBCCryptoModule(this.encryptingKek, this.encryptingKekId,
+            this.encryptingKeyManager);
+        return (cm.getDecrypter(fek));
+      case AESGCMCryptoModule.VERSION:
+        cm = new AESGCMCryptoModule(this.encryptingKek, this.encryptingKekId,
+            this.encryptingKeyManager);
+        return (cm.getDecrypter(fek));
+      default:
+        throw new CryptoException(
+            "Unknown crypto module version: " + parsed.getCryptoServiceVersion());
+    }
+  }
+
+  private static boolean checkNoCrypto(byte[] params) {
+    byte[] noCryptoBytes = NO_CRYPTO_VERSION.getBytes(Charset.forName("UTF-8"));
+    return Arrays.equals(params, noCryptoBytes);
+  }
+
+  static class ParsedCryptoParameters {
+    String cryptoServiceName;
+    String cryptoServiceVersion;
+    String keyManagerVersion;
+    String kekId;
+    byte[] encFek;
+
+    public String getCryptoServiceName() {
+      return cryptoServiceName;
+    }
+
+    public void setCryptoServiceName(String cryptoServiceName) {
+      this.cryptoServiceName = cryptoServiceName;
+    }
+
+    public String getCryptoServiceVersion() {
+      return cryptoServiceVersion;
+    }
+
+    public void setCryptoServiceVersion(String cryptoServiceVersion) {
+      this.cryptoServiceVersion = cryptoServiceVersion;
+    }
+
+    public String getKeyManagerVersion() {
+      return keyManagerVersion;
+    }
+
+    public void setKeyManagerVersion(String keyManagerVersion) {
+      this.keyManagerVersion = keyManagerVersion;
+    }
+
+    public String getKekId() {
+      return kekId;
+    }
+
+    public void setKekId(String kekId) {
+      this.kekId = kekId;
+    }
+
+    public byte[] getEncFek() {
+      return encFek;
+    }
+
+    public void setEncFek(byte[] encFek) {
+      this.encFek = encFek;
+    }
+
+  }
+
+  private static byte[] createCryptoParameters(String version, Key encryptingKek,
+      String encryptingKekId, String encryptingKeyManager, Key fek) {
+
+    byte[] bytes;
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream params = new DataOutputStream(baos)) {
+      params.writeUTF(AESCryptoService.class.getName());
+      params.writeUTF(version);
+      params.writeUTF(encryptingKeyManager);
+      params.writeUTF(encryptingKekId);
+      byte[] wrappedFek = KeyManager.wrapKey(fek, encryptingKek);
+      params.writeInt(wrappedFek.length);
+      params.write(wrappedFek);
+
+      bytes = baos.toByteArray();
+    } catch (IOException e) {
+      throw new CryptoException("Error creating crypto params", e);
+    }
+    return bytes;
+  }
+
+  private static ParsedCryptoParameters parseCryptoParameters(byte[] parameters) {
+    ParsedCryptoParameters parsed = new ParsedCryptoParameters();
+    try (ByteArrayInputStream bais = new ByteArrayInputStream(parameters);
+        DataInputStream params = new DataInputStream(bais)) {
+      parsed.setCryptoServiceName(params.readUTF());
+      parsed.setCryptoServiceVersion(params.readUTF());
+      parsed.setKeyManagerVersion(params.readUTF());
+      parsed.setKekId(params.readUTF());
+      int encFekLen = params.readInt();
+      byte[] encFek = new byte[encFekLen];
+      int bytesRead = params.read(encFek);
+      if (bytesRead != encFekLen)
+        throw new CryptoException("Incorrect number of bytes read for encrypted FEK");
+      parsed.setEncFek(encFek);
+    } catch (IOException e) {
+      throw new CryptoException("Error creating crypto params", e);
+    }
+    return parsed;
+  }
+
+  private static Key loadDecryptionKek(ParsedCryptoParameters params) {
+    Key ret = null;
+    String keyTag = params.getKeyManagerVersion() + "!" + params.getKekId();
+    if (decryptingKeys.get(keyTag) != null) {
+      return (decryptingKeys.get(keyTag));
+    }
+
+    switch (params.keyManagerVersion) {
+      case KeyManager.URI:
+        ret = KeyManager.loadKekFromUri(params.kekId);
+        break;
+      default:
+        throw new CryptoException("Unable to load kek: " + params.kekId);
+    }
+
+    decryptingKeys.put(keyTag, ret);
+
+    if (ret == null)
+      throw new CryptoException("Unable to load decryption KEK");
+
+    return (ret);
+  }
+
+  private static SecureRandom getSecureRandom(String secureRNG, String secureRNGProvider) {
+    SecureRandom secureRandom = null;
+    try {
+      secureRandom = SecureRandom.getInstance(secureRNG, secureRNGProvider);
+
+      // Immediately seed the generator
+      byte[] throwAway = new byte[16];
+      secureRandom.nextBytes(throwAway);
+
+    } catch (NoSuchAlgorithmException | NoSuchProviderException e) {
+      throw new CryptoException("Unable to generate secure random.", e);
+    }
+    return secureRandom;
+  }
+
+  /**
+   * This interface lists the methods needed by CryptoModules which are responsible for tracking
+   * version and preparing encrypters/decrypters for use.
+   */
+  private interface CryptoModule {
+    FileEncrypter getEncrypter();
+
+    FileDecrypter getDecrypter(Key fek);
+  }
+
+  public static class AESGCMCryptoModule implements CryptoModule {
+    private static final String VERSION = "U+1F43B"; // unicode bear emoji rawr
+
+    private final Integer GCM_IV_LENGTH_IN_BYTES = 12;
+    private final Integer KEY_LENGTH_IN_BYTES = 16;
+
+    // 128-bit tags are the longest available for GCM
+    private final Integer GCM_TAG_LENGTH_IN_BITS = 16 * 8;
+    private final String transformation = "AES/GCM/NoPadding";
+    private boolean ivReused = false;
+    private Key encryptingKek;
+    private String encryptingKekId;
+    private String encryptingKeyManager;
+
+    public AESGCMCryptoModule(Key encryptingKek, String encryptingKekId,
+        String encryptingKeyManager) {
+      this.encryptingKek = encryptingKek;
+      this.encryptingKekId = encryptingKekId;
+      this.encryptingKeyManager = encryptingKeyManager;
+    }
+
+    @Override
+    public FileEncrypter getEncrypter() {
+      return new AESGCMFileEncrypter();
+    }
+
+    @Override
+    public FileDecrypter getDecrypter(Key fek) {
+      return new AESGCMFileDecrypter(fek);
+    }
+
+    public class AESGCMFileEncrypter implements FileEncrypter {
+
+      private byte[] firstInitVector;
+      private SecureRandom sr = getSecureRandom("SHA1PRNG", "SUN");
+      private Key fek = KeyManager.generateKey(sr, KEY_LENGTH_IN_BYTES);
+      private byte[] initVector = new byte[GCM_IV_LENGTH_IN_BYTES];
+
+      AESGCMFileEncrypter() {
+
+        sr.nextBytes(initVector);
+        firstInitVector = Arrays.copyOf(initVector, initVector.length);
+      }
+
+      @Override
+      public OutputStream encryptStream(OutputStream outputStream) throws CryptoException {
+        if (ivReused) {
+          throw new CryptoException(
+              "Key/IV reuse is forbidden in AESGCMCryptoModule. Too many RBlocks.");
+        }
+        incrementIV(initVector, initVector.length - 1);
+        if (Arrays.equals(initVector, firstInitVector)) {
+          ivReused = true; // This will allow us to write the final block, since the
+          // initialization vector
+          // is always incremented before use.
+        }
+
+        // write IV before encrypting
+        try {
+          outputStream.write(initVector);
+        } catch (IOException e) {
+          throw new CryptoException("Unable to write IV to stream", e);
+        }
+
+        Cipher cipher;
+        try {
+          cipher = Cipher.getInstance(transformation);
+          cipher.init(Cipher.ENCRYPT_MODE, fek,
+              new GCMParameterSpec(GCM_TAG_LENGTH_IN_BITS, initVector));
+        } catch (NoSuchAlgorithmException | NoSuchPaddingException | InvalidKeyException
+            | InvalidAlgorithmParameterException e) {
+          throw new CryptoException("Unable to initialize cipher", e);
+        }
+
+        RFileCipherOutputStream cos = new RFileCipherOutputStream(
+            new DiscardCloseOutputStream(outputStream), cipher);
+        // Prevent underlying stream from being closed with DiscardCloseOutputStream
+        // Without this, when the crypto stream is closed (in order to flush its last bytes)
+        // the underlying RFile stream will *also* be closed, and that's undesirable as the
+        // cipher
+        // stream is closed for every block written.
+        return new BlockedOutputStream(cos, cipher.getBlockSize(), 1024);
+      }
+
+      /**
+       * Because IVs can be longer than longs, this increments arbitrarily sized byte arrays by 1,
+       * with a roll over to 0 after the max value is reached.
+       *
+       * @param iv
+       *          The iv to be incremented
+       * @param i
+       *          The current byte being incremented
+       */
+      void incrementIV(byte[] iv, int i) {
+        iv[i]++;
+        if (iv[i] == 0) {
+          if (i != 0) {
+            incrementIV(iv, i - 1);
+          } else
+            return;
+        }
+
+      }
+
+      @Override
+      public byte[] getDecryptionParameters() {
+        return createCryptoParameters(VERSION, encryptingKek, encryptingKekId, encryptingKeyManager,
+            fek);
+      }
+    }
+
+    public class AESGCMFileDecrypter implements FileDecrypter {
+      private Key fek;
+
+      AESGCMFileDecrypter(Key fek) {
+        this.fek = fek;
+      }
+
+      @Override
+      public InputStream decryptStream(InputStream inputStream) throws CryptoException {
+        byte[] initVector = new byte[GCM_IV_LENGTH_IN_BYTES];
+        try {
+          IOUtils.readFully(inputStream, initVector);
+        } catch (IOException e) {
+          throw new CryptoException("Unable to read IV from stream", e);
+        }
+
+        Cipher cipher;
+        try {
+          cipher = Cipher.getInstance(transformation);
+          cipher.init(Cipher.DECRYPT_MODE, fek,
+              new GCMParameterSpec(GCM_TAG_LENGTH_IN_BITS, initVector));
+
+        } catch (NoSuchAlgorithmException | NoSuchPaddingException | InvalidKeyException
+            | InvalidAlgorithmParameterException e) {
+          throw new CryptoException("Unable to initialize cipher", e);
+        }
+
+        CipherInputStream cis = new CipherInputStream(inputStream, cipher);
+        return new BlockedInputStream(cis, cipher.getBlockSize(), 1024);
+      }
+    }
+  }
+
+  public static class AESCBCCryptoModule implements CryptoModule {
+    public static final String VERSION = "U+1f600"; // unicode grinning face emoji
+    private final Integer IV_LENGTH_IN_BYTES = 16;
+    private final Integer KEY_LENGTH_IN_BYTES = 16;
+    private final String transformation = "AES/CBC/NoPadding";
+    private Key encryptingKek;
+    private String encryptingKekId;
+    private String encryptingKeyManager;
+
+    public AESCBCCryptoModule(Key encryptingKek, String encryptingKekId,
+        String encryptingKeyManager) {
+      this.encryptingKek = encryptingKek;
+      this.encryptingKekId = encryptingKekId;
+      this.encryptingKeyManager = encryptingKeyManager;
+    }
+
+    @Override
+    public FileEncrypter getEncrypter() {
+      return new AESCBCFileEncrypter();
+    }
+
+    @Override
+    public FileDecrypter getDecrypter(Key fek) {
+      return new AESCBCFileDecrypter(fek);
+    }
+
+    public class AESCBCFileEncrypter implements FileEncrypter {
+
+      private SecureRandom sr = getSecureRandom("SHA1PRNG", "SUN");
+      private Key fek = KeyManager.generateKey(sr, KEY_LENGTH_IN_BYTES);
+      private byte[] initVector = new byte[IV_LENGTH_IN_BYTES];
+
+      @Override
+      public OutputStream encryptStream(OutputStream outputStream) throws CryptoException {
+
+        CryptoUtils.getSha1SecureRandom().nextBytes(initVector);
+        try {
+          outputStream.write(initVector);
+        } catch (IOException e) {
+          throw new CryptoException("Unable to write IV to stream", e);
+        }
+
+        Cipher cipher;
+        try {
+          cipher = Cipher.getInstance(transformation);
+          cipher.init(Cipher.ENCRYPT_MODE, fek, new IvParameterSpec(initVector));
+        } catch (NoSuchAlgorithmException | NoSuchPaddingException | InvalidKeyException
+            | InvalidAlgorithmParameterException e) {
+          throw new CryptoException("Unable to initialize cipher", e);
+        }
+
+        CipherOutputStream cos = new CipherOutputStream(new DiscardCloseOutputStream(outputStream),
+            cipher);
+        // Prevent underlying stream from being closed with DiscardCloseOutputStream
+        // Without this, when the crypto stream is closed (in order to flush its last bytes)
+        // the underlying RFile stream will *also* be closed, and that's undesirable as the
+        // cipher
+        // stream is closed for every block written.
+        return new BlockedOutputStream(cos, cipher.getBlockSize(), 1024);
+      }
+
+      @Override
+      public byte[] getDecryptionParameters() {
+        return createCryptoParameters(VERSION, encryptingKek, encryptingKekId, encryptingKeyManager,
+            fek);
+      }
+    }
+
+    public class AESCBCFileDecrypter implements FileDecrypter {
+      private Key fek;
+
+      AESCBCFileDecrypter(Key fek) {
+        this.fek = fek;
+      }
+
+      @Override
+      public InputStream decryptStream(InputStream inputStream) throws CryptoException {
+        byte[] initVector = new byte[IV_LENGTH_IN_BYTES];
+        try {
+          IOUtils.readFully(inputStream, initVector);
+        } catch (IOException e) {
+          throw new CryptoException("Unable to read IV from stream", e);
+        }
+
+        Cipher cipher;
+        try {
+          cipher = Cipher.getInstance(transformation);
+          cipher.init(Cipher.DECRYPT_MODE, fek, new IvParameterSpec(initVector));
+        } catch (NoSuchAlgorithmException | NoSuchPaddingException | InvalidKeyException
+            | InvalidAlgorithmParameterException e) {
+          throw new CryptoException("Unable to initialize cipher", e);
+        }
+
+        CipherInputStream cis = new CipherInputStream(inputStream, cipher);
+        return new BlockedInputStream(cis, cipher.getBlockSize(), 1024);
+      }
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/security/crypto/impl/CryptoEnvironmentImpl.java b/core/src/main/java/org/apache/accumulo/core/security/crypto/impl/CryptoEnvironmentImpl.java
new file mode 100644
index 0000000..577d833
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/security/crypto/impl/CryptoEnvironmentImpl.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.accumulo.core.security.crypto.impl;
+
+import org.apache.accumulo.core.spi.crypto.CryptoEnvironment;
+
+/**
+ * @since 2.0
+ */
+public class CryptoEnvironmentImpl implements CryptoEnvironment {
+
+  private Scope scope;
+  private byte[] decryptionParams;
+
+  public CryptoEnvironmentImpl(Scope scope, byte[] decryptionParams) {
+    this.scope = scope;
+    this.decryptionParams = decryptionParams;
+  }
+
+  public Scope getScope() {
+    return this.scope;
+  }
+
+  public byte[] getDecryptionParams() {
+    return decryptionParams;
+  }
+
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/security/crypto/impl/KeyManager.java b/core/src/main/java/org/apache/accumulo/core/security/crypto/impl/KeyManager.java
new file mode 100644
index 0000000..f24d24e
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/security/crypto/impl/KeyManager.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.accumulo.core.security.crypto.impl;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.InvalidKeyException;
+import java.security.Key;
+import java.security.NoSuchAlgorithmException;
+import java.security.NoSuchProviderException;
+import java.security.SecureRandom;
+
+import javax.crypto.Cipher;
+import javax.crypto.IllegalBlockSizeException;
+import javax.crypto.NoSuchPaddingException;
+import javax.crypto.spec.SecretKeySpec;
+
+import org.apache.accumulo.core.spi.crypto.CryptoService.CryptoException;
+
+public class KeyManager {
+
+  public static final String URI = "uri";
+
+  public static Key generateKey(SecureRandom sr, int size) {
+    byte[] bytes = new byte[size];
+    sr.nextBytes(bytes);
+    return new SecretKeySpec(bytes, "AES");
+  }
+
+  public static Key unwrapKey(byte[] fek, Key kek) {
+    Key result = null;
+    try {
+      Cipher c = Cipher.getInstance("AESWrap", "SunJCE");
+      c.init(Cipher.UNWRAP_MODE, kek);
+      result = c.unwrap(fek, "AES", Cipher.SECRET_KEY);
+    } catch (InvalidKeyException | NoSuchAlgorithmException | NoSuchProviderException
+        | NoSuchPaddingException e) {
+      throw new CryptoException("Unable to unwrap file encryption key", e);
+    }
+    return result;
+  }
+
+  public static byte[] wrapKey(Key fek, Key kek) {
+    byte[] result = null;
+    try {
+      Cipher c = Cipher.getInstance("AESWrap", "SunJCE");
+      c.init(Cipher.WRAP_MODE, kek);
+      result = c.wrap(fek);
+    } catch (InvalidKeyException | NoSuchAlgorithmException | NoSuchProviderException
+        | NoSuchPaddingException | IllegalBlockSizeException e) {
+      throw new CryptoException("Unable to wrap file encryption key", e);
+    }
+
+    return result;
+  }
+
+  public static SecretKeySpec loadKekFromUri(String keyId) {
+    URI uri;
+    SecretKeySpec key = null;
+    try {
+      uri = new URI(keyId);
+      key = new SecretKeySpec(Files.readAllBytes(Paths.get(uri.getPath())), "AES");
+    } catch (URISyntaxException | IOException e) {
+      throw new CryptoException("Unable to load key encryption key.", e);
+    }
+
+    return key;
+
+  }
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/security/crypto/impl/NoCryptoService.java b/core/src/main/java/org/apache/accumulo/core/security/crypto/impl/NoCryptoService.java
new file mode 100644
index 0000000..53fd1d5
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/security/crypto/impl/NoCryptoService.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.accumulo.core.security.crypto.impl;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.spi.crypto.CryptoEnvironment;
+import org.apache.accumulo.core.spi.crypto.CryptoService;
+import org.apache.accumulo.core.spi.crypto.FileDecrypter;
+import org.apache.accumulo.core.spi.crypto.FileEncrypter;
+
+/**
+ * The default encryption strategy which does nothing.
+ */
+public class NoCryptoService implements CryptoService {
+  public static final String VERSION = "U+1F47B"; // unicode ghost emoji
+
+  @Override
+  public void init(Map<String,String> conf) throws CryptoException {
+    // do nothing
+  }
+
+  @Override
+  public FileEncrypter getFileEncrypter(CryptoEnvironment environment) {
+    return new NoFileEncrypter();
+  }
+
+  @Override
+  public FileDecrypter getFileDecrypter(CryptoEnvironment environment) {
+    return new NoFileDecrypter();
+  }
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/security/crypto/impl/NoFileDecrypter.java b/core/src/main/java/org/apache/accumulo/core/security/crypto/impl/NoFileDecrypter.java
new file mode 100644
index 0000000..6ccd3bd
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/security/crypto/impl/NoFileDecrypter.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.accumulo.core.security.crypto.impl;
+
+import java.io.InputStream;
+
+import org.apache.accumulo.core.spi.crypto.CryptoService;
+import org.apache.accumulo.core.spi.crypto.FileDecrypter;
+
+public class NoFileDecrypter implements FileDecrypter {
+  @Override
+  public InputStream decryptStream(InputStream inputStream) throws CryptoService.CryptoException {
+    return inputStream;
+  }
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/security/crypto/impl/NoFileEncrypter.java b/core/src/main/java/org/apache/accumulo/core/security/crypto/impl/NoFileEncrypter.java
new file mode 100644
index 0000000..525c0e4
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/security/crypto/impl/NoFileEncrypter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.accumulo.core.security.crypto.impl;
+
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+
+import org.apache.accumulo.core.spi.crypto.CryptoService;
+import org.apache.accumulo.core.spi.crypto.FileEncrypter;
+
+public class NoFileEncrypter implements FileEncrypter {
+
+  @Override
+  public OutputStream encryptStream(OutputStream outputStream)
+      throws CryptoService.CryptoException {
+    return outputStream;
+  }
+
+  @Override
+  public byte[] getDecryptionParameters() {
+    return NoCryptoService.VERSION.getBytes(Charset.forName("UTF-8"));
+  }
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/security/crypto/BlockedInputStream.java b/core/src/main/java/org/apache/accumulo/core/security/crypto/streams/BlockedInputStream.java
similarity index 98%
rename from core/src/main/java/org/apache/accumulo/core/security/crypto/BlockedInputStream.java
rename to core/src/main/java/org/apache/accumulo/core/security/crypto/streams/BlockedInputStream.java
index e5209d8..c9ec439 100644
--- a/core/src/main/java/org/apache/accumulo/core/security/crypto/BlockedInputStream.java
+++ b/core/src/main/java/org/apache/accumulo/core/security/crypto/streams/BlockedInputStream.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.accumulo.core.security.crypto;
+package org.apache.accumulo.core.security.crypto.streams;
 
 import java.io.DataInputStream;
 import java.io.EOFException;
diff --git a/core/src/main/java/org/apache/accumulo/core/security/crypto/BlockedOutputStream.java b/core/src/main/java/org/apache/accumulo/core/security/crypto/streams/BlockedOutputStream.java
similarity index 97%
rename from core/src/main/java/org/apache/accumulo/core/security/crypto/BlockedOutputStream.java
rename to core/src/main/java/org/apache/accumulo/core/security/crypto/streams/BlockedOutputStream.java
index e43911d..6a013b8 100644
--- a/core/src/main/java/org/apache/accumulo/core/security/crypto/BlockedOutputStream.java
+++ b/core/src/main/java/org/apache/accumulo/core/security/crypto/streams/BlockedOutputStream.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.accumulo.core.security.crypto;
+package org.apache.accumulo.core.security.crypto.streams;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -92,7 +92,7 @@ public class BlockedOutputStream extends OutputStream {
       len -= remaining;
     }
     // And then write the remainder (and this is guaranteed to not fill the buffer, so we won't
-    // flush afteward
+    // flush afterward
     bb.put(b, off, len);
   }
 
diff --git a/core/src/main/java/org/apache/accumulo/core/security/crypto/DiscardCloseOutputStream.java b/core/src/main/java/org/apache/accumulo/core/security/crypto/streams/DiscardCloseOutputStream.java
similarity index 95%
rename from core/src/main/java/org/apache/accumulo/core/security/crypto/DiscardCloseOutputStream.java
rename to core/src/main/java/org/apache/accumulo/core/security/crypto/streams/DiscardCloseOutputStream.java
index 2d31be4..cf0db70 100644
--- a/core/src/main/java/org/apache/accumulo/core/security/crypto/DiscardCloseOutputStream.java
+++ b/core/src/main/java/org/apache/accumulo/core/security/crypto/streams/DiscardCloseOutputStream.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.accumulo.core.security.crypto;
+package org.apache.accumulo.core.security.crypto.streams;
 
 import java.io.FilterOutputStream;
 import java.io.IOException;
diff --git a/core/src/main/java/org/apache/accumulo/core/security/crypto/NoFlushOutputStream.java b/core/src/main/java/org/apache/accumulo/core/security/crypto/streams/NoFlushOutputStream.java
similarity index 94%
rename from core/src/main/java/org/apache/accumulo/core/security/crypto/NoFlushOutputStream.java
rename to core/src/main/java/org/apache/accumulo/core/security/crypto/streams/NoFlushOutputStream.java
index 17fc06a..1fa2af7 100644
--- a/core/src/main/java/org/apache/accumulo/core/security/crypto/NoFlushOutputStream.java
+++ b/core/src/main/java/org/apache/accumulo/core/security/crypto/streams/NoFlushOutputStream.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.accumulo.core.security.crypto;
+package org.apache.accumulo.core.security.crypto.streams;
 
 import java.io.DataOutputStream;
 import java.io.OutputStream;
diff --git a/core/src/main/java/org/apache/accumulo/core/security/crypto/RFileCipherOutputStream.java b/core/src/main/java/org/apache/accumulo/core/security/crypto/streams/RFileCipherOutputStream.java
similarity index 98%
rename from core/src/main/java/org/apache/accumulo/core/security/crypto/RFileCipherOutputStream.java
rename to core/src/main/java/org/apache/accumulo/core/security/crypto/streams/RFileCipherOutputStream.java
index 270a02e..1ef0c87 100644
--- a/core/src/main/java/org/apache/accumulo/core/security/crypto/RFileCipherOutputStream.java
+++ b/core/src/main/java/org/apache/accumulo/core/security/crypto/streams/RFileCipherOutputStream.java
@@ -14,7 +14,7 @@
  * see the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.accumulo.core.security.crypto;
+package org.apache.accumulo.core.security.crypto.streams;
 
 import java.io.IOException;
 import java.io.OutputStream;
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/crypto/CryptoEnvironment.java b/core/src/main/java/org/apache/accumulo/core/spi/crypto/CryptoEnvironment.java
new file mode 100644
index 0000000..8753e8d
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/spi/crypto/CryptoEnvironment.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.accumulo.core.spi.crypto;
+
+/**
+ * Useful information provided to the crypto implementation
+ *
+ * @since 2.0
+ */
+public interface CryptoEnvironment {
+  /**
+   * Where in Accumulo the on-disk file encryption takes place.
+   */
+  enum Scope {
+    WAL, RFILE;
+  }
+
+  Scope getScope();
+
+  byte[] getDecryptionParams();
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/crypto/CryptoService.java b/core/src/main/java/org/apache/accumulo/core/spi/crypto/CryptoService.java
new file mode 100644
index 0000000..4236d1a
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/spi/crypto/CryptoService.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.accumulo.core.spi.crypto;
+
+import java.util.Map;
+
+/**
+ * Self contained cryptographic service. All on disk encryption and decryption will take place
+ * through this interface. Each implementation must implement a {@link FileEncrypter} for encryption
+ * and a {@link FileDecrypter} for decryption.
+ *
+ * @since 2.0
+ */
+public interface CryptoService {
+
+  /**
+   * Initialize CryptoService. This is called once at Tablet Server startup.
+   */
+  void init(Map<String,String> conf) throws CryptoException;
+
+  /**
+   * Initialize the FileEncrypter for the environment and return. This will get called once per
+   * R-File or Write Ahead Log. FileEncrypter implementation must be thread safe.
+   */
+  FileEncrypter getFileEncrypter(CryptoEnvironment environment);
+
+  /**
+   * Initialize the FileDecrypter for the environment and return. This will get called once per
+   * R-File or Write Ahead Log. FileDecrypter implementation must be thread safe.
+   */
+  FileDecrypter getFileDecrypter(CryptoEnvironment environment);
+
+  /**
+   * Runtime Crypto exception
+   */
+  class CryptoException extends RuntimeException {
+
+    private static final long serialVersionUID = -7588781060677839664L;
+
+    public CryptoException() {
+      super();
+    }
+
+    public CryptoException(String message) {
+      super(message);
+    }
+
+    public CryptoException(String message, Throwable cause) {
+      super(message, cause);
+    }
+
+    public CryptoException(Throwable cause) {
+      super(cause);
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/crypto/FileDecrypter.java b/core/src/main/java/org/apache/accumulo/core/spi/crypto/FileDecrypter.java
new file mode 100644
index 0000000..bf28226
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/spi/crypto/FileDecrypter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.accumulo.core.spi.crypto;
+
+import java.io.InputStream;
+
+/**
+ * Class implementation that will decrypt a file. Make sure implementation is thread safe.
+ *
+ * @since 2.0
+ */
+public interface FileDecrypter {
+  /**
+   * Decrypt the InputStream
+   */
+  InputStream decryptStream(InputStream inputStream) throws CryptoService.CryptoException;
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/crypto/FileEncrypter.java b/core/src/main/java/org/apache/accumulo/core/spi/crypto/FileEncrypter.java
new file mode 100644
index 0000000..254365a
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/spi/crypto/FileEncrypter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.accumulo.core.spi.crypto;
+
+import java.io.OutputStream;
+
+/**
+ * Class implementation that will encrypt a file. Make sure implementation is thread safe.
+ *
+ * @since 2.0
+ */
+public interface FileEncrypter {
+  /**
+   * Encrypt the OutputStream.
+   */
+  OutputStream encryptStream(OutputStream outputStream) throws CryptoService.CryptoException;
+
+  /**
+   * Get all the parameters required for decryption. WARNING: This byte[] will get written as part
+   * of the OutputStream as it is returned (either before or after the encrypted data). Do not
+   * return any unencrypted sensitive information.
+   *
+   * For example, return information about the encryption taking place such as version, class name
+   * or a wrapped File Encryption Key. This information will get written at the beginning of an
+   * encrypted Write Ahead Log (WAL) or at the end of an encrypted R-File. Later, it will be read
+   * from the file and passed to the {@link FileDecrypter} as part of {@link CryptoEnvironment} for
+   * everything it needs for decryption.
+   */
+  byte[] getDecryptionParameters();
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java b/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
index 4f124e6..7fa7b38 100644
--- a/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
+++ b/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
@@ -57,6 +57,7 @@ import org.apache.accumulo.core.data.thrift.TSummaryRequest;
 import org.apache.accumulo.core.metadata.schema.MetadataScanner;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.rpc.ThriftUtil;
+import org.apache.accumulo.core.security.crypto.CryptoServiceFactory;
 import org.apache.accumulo.core.spi.cache.BlockCache;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
@@ -665,7 +666,9 @@ public class Gatherer {
       List<RowRange> ranges, BlockCache summaryCache, BlockCache indexCache) {
     Path path = new Path(file);
     Configuration conf = CachedConfiguration.getInstance();
-    return SummaryReader.load(volMgr.get(path), conf, ctx.getConfiguration(), factory, path,
-        summarySelector, summaryCache, indexCache).getSummaries(ranges);
+    return SummaryReader
+        .load(volMgr.get(path), conf, ctx.getConfiguration(), factory, path, summarySelector,
+            summaryCache, indexCache, CryptoServiceFactory.getConfigured(ctx.getConfiguration()))
+        .getSummaries(ranges);
   }
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
index 8fb8807..eb49fbb 100644
--- a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
+++ b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
@@ -37,6 +37,7 @@ import org.apache.accumulo.core.file.rfile.RFile.Reader;
 import org.apache.accumulo.core.file.rfile.bcfile.MetaBlockDoesNotExist;
 import org.apache.accumulo.core.spi.cache.BlockCache;
 import org.apache.accumulo.core.spi.cache.CacheEntry;
+import org.apache.accumulo.core.spi.crypto.CryptoService;
 import org.apache.accumulo.core.summary.Gatherer.RowRange;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -173,24 +174,26 @@ public class SummaryReader {
 
   public static SummaryReader load(Configuration conf, AccumuloConfiguration aConf,
       InputStream inputStream, long length, Predicate<SummarizerConfiguration> summarySelector,
-      SummarizerFactory factory) throws IOException {
+      SummarizerFactory factory, CryptoService cryptoService) throws IOException {
     // @formatter:off
     org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile.Reader bcReader =
-      new CachableBlockFile.Reader((InputStream & Seekable) inputStream, length, conf, aConf);
+      new CachableBlockFile.Reader((InputStream & Seekable) inputStream, length, conf, aConf,
+              cryptoService);
     // @formatter:on
     return load(bcReader, summarySelector, factory);
   }
 
   public static SummaryReader load(FileSystem fs, Configuration conf, AccumuloConfiguration aConf,
       SummarizerFactory factory, Path file, Predicate<SummarizerConfiguration> summarySelector,
-      BlockCache summaryCache, BlockCache indexCache) {
+      BlockCache summaryCache, BlockCache indexCache, CryptoService cryptoService) {
     CachableBlockFile.Reader bcReader = null;
 
     try {
       // the reason BCFile is used instead of RFile is to avoid reading in the RFile meta block when
       // only summary data is wanted.
       CompositeCache compositeCache = new CompositeCache(summaryCache, indexCache);
-      bcReader = new CachableBlockFile.Reader(fs, file, conf, null, compositeCache, aConf);
+      bcReader = new CachableBlockFile.Reader(fs, file, conf, null, compositeCache, aConf,
+          cryptoService);
       return load(bcReader, summarySelector, factory);
     } catch (FileNotFoundException fne) {
       SummaryReader sr = new SummaryReader();
diff --git a/core/src/test/java/org/apache/accumulo/core/conf/ConfigSanityCheckTest.java b/core/src/test/java/org/apache/accumulo/core/conf/ConfigSanityCheckTest.java
index 9ae175b..14c6c11 100644
--- a/core/src/test/java/org/apache/accumulo/core/conf/ConfigSanityCheckTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/conf/ConfigSanityCheckTest.java
@@ -25,12 +25,6 @@ import org.junit.Test;
 public class ConfigSanityCheckTest {
   private Map<String,String> m;
 
-  // These are used when a valid class is needed for testing
-  private static final String PROPS_PREFIX = "org.apache.accumulo.core.security.crypto.";
-  private static final String DEFAULT_CRYPTO_MODULE = PROPS_PREFIX + "DefaultCryptoModule";
-  private static final String DEFAULT_SECRET_KEY_ENCRYPTION_STRATEGY = PROPS_PREFIX
-      + "NonCachingSecretKeyEncryptionStrategy";
-
   @Before
   public void setUp() {
     m = new java.util.HashMap<>();
@@ -78,102 +72,15 @@ public class ConfigSanityCheckTest {
   }
 
   @Test(expected = SanityCheckException.class)
-  public void testFail_cipherSuiteSetKeyAlgorithmNotSet() {
-    m.put(Property.CRYPTO_CIPHER_SUITE.getKey(), "AES/CBC/NoPadding");
-    m.put(Property.CRYPTO_CIPHER_KEY_ALGORITHM_NAME.getKey(), "NullCipher");
-    ConfigSanityCheck.validate(m.entrySet());
-  }
-
-  @Test(expected = SanityCheckException.class)
-  public void testFail_cipherSuiteNotSetKeyAlgorithmSet() {
-    m.put(Property.CRYPTO_CIPHER_SUITE.getKey(), "NullCipher");
-    m.put(Property.CRYPTO_CIPHER_KEY_ALGORITHM_NAME.getKey(), "AES");
-    ConfigSanityCheck.validate(m.entrySet());
-  }
-
-  @Test(expected = SanityCheckException.class)
-  public void testFail_cryptoModuleInvalidClass() {
-    // a random hex dump is unlikely to be a real class name
-    m.put(Property.CRYPTO_MODULE_CLASS.getKey(), "e0218734bcd1e4d239203f970806786b");
-    m.put(Property.CRYPTO_SECRET_KEY_ENCRYPTION_STRATEGY_CLASS.getKey(),
-        DEFAULT_SECRET_KEY_ENCRYPTION_STRATEGY);
-    ConfigSanityCheck.validate(m.entrySet());
-  }
-
-  @Test(expected = SanityCheckException.class)
-  public void testFail_cryptoModuleValidClassNotValidInterface() {
-    m.put(Property.CRYPTO_MODULE_CLASS.getKey(), "java.lang.String");
-    m.put(Property.CRYPTO_SECRET_KEY_ENCRYPTION_STRATEGY_CLASS.getKey(),
-        DEFAULT_SECRET_KEY_ENCRYPTION_STRATEGY);
-    ConfigSanityCheck.validate(m.entrySet());
-  }
-
-  @Test
-  public void testPass_cryptoModuleAndSecretKeyEncryptionStrategyValidClasses() {
-    m.put(Property.CRYPTO_MODULE_CLASS.getKey(), DEFAULT_CRYPTO_MODULE);
-    m.put(Property.CRYPTO_SECRET_KEY_ENCRYPTION_STRATEGY_CLASS.getKey(),
-        DEFAULT_SECRET_KEY_ENCRYPTION_STRATEGY);
-    ConfigSanityCheck.validate(m.entrySet());
-  }
-
-  @Test
-  public void testPass_cryptoModuleValidNullModule() {
-    m.put(Property.CRYPTO_MODULE_CLASS.getKey(), "NullCryptoModule");
-    ConfigSanityCheck.validate(m.entrySet());
-  }
-
-  @Test(expected = SanityCheckException.class)
-  public void testFail_secretKeyEncryptionStrategyInvalidClass() {
-    // a random hex dump is unlikely to be a real class name
-    m.put(Property.CRYPTO_SECRET_KEY_ENCRYPTION_STRATEGY_CLASS.getKey(),
-        "e0218734bcd1e4d239203f970806786b");
-    m.put(Property.CRYPTO_MODULE_CLASS.getKey(), DEFAULT_CRYPTO_MODULE);
-    ConfigSanityCheck.validate(m.entrySet());
-  }
-
-  @Test(expected = SanityCheckException.class)
-  public void testFail_secretKeyEncryptionStrategyValidClassNotValidInterface() {
-    m.put(Property.CRYPTO_SECRET_KEY_ENCRYPTION_STRATEGY_CLASS.getKey(), "java.lang.String");
-    m.put(Property.CRYPTO_MODULE_CLASS.getKey(), DEFAULT_CRYPTO_MODULE);
-    ConfigSanityCheck.validate(m.entrySet());
-  }
-
-  @Test
-  public void testPass_secretKeyEncryptionStrategyValidNullStrategy() {
-    m.put(Property.CRYPTO_SECRET_KEY_ENCRYPTION_STRATEGY_CLASS.getKey(),
-        "NullSecretKeyEncryptionStrategy");
-    ConfigSanityCheck.validate(m.entrySet());
-  }
-
-  @Test(expected = SanityCheckException.class)
-  public void testFail_cryptoModuleSetSecretKeyEncryptionStrategyNotSet() {
-    m.put(Property.CRYPTO_MODULE_CLASS.getKey(), DEFAULT_CRYPTO_MODULE);
-    m.put(Property.CRYPTO_SECRET_KEY_ENCRYPTION_STRATEGY_CLASS.getKey(),
-        "NullSecretKeyEncryptionStrategy");
-    ConfigSanityCheck.validate(m.entrySet());
-  }
-
-  @Test(expected = SanityCheckException.class)
-  public void testFail_cryptoModuleNotSetSecretKeyEncryptionStrategySet() {
-    m.put(Property.CRYPTO_MODULE_CLASS.getKey(), "NullCryptoModule");
-    m.put(Property.CRYPTO_SECRET_KEY_ENCRYPTION_STRATEGY_CLASS.getKey(),
-        DEFAULT_SECRET_KEY_ENCRYPTION_STRATEGY);
-    ConfigSanityCheck.validate(m.entrySet());
-  }
-
-  @Test
-  public void testPass_cryptoModuleAndSecretKeyEncryptionStrategyBothNull() {
-    m.put(Property.CRYPTO_MODULE_CLASS.getKey(), "NullCryptoModule");
-    m.put(Property.CRYPTO_SECRET_KEY_ENCRYPTION_STRATEGY_CLASS.getKey(),
-        "NullSecretKeyEncryptionStrategy");
+  public void testFail_badCryptoService() {
+    m.put(Property.INSTANCE_CRYPTO_SERVICE.getKey(), "DoesNotExistCryptoService");
     ConfigSanityCheck.validate(m.entrySet());
   }
 
   @Test
-  public void testPass_cryptoModuleAndSecretKeyEncryptionStrategyBothSet() {
-    m.put(Property.CRYPTO_MODULE_CLASS.getKey(), DEFAULT_CRYPTO_MODULE);
-    m.put(Property.CRYPTO_SECRET_KEY_ENCRYPTION_STRATEGY_CLASS.getKey(),
-        DEFAULT_SECRET_KEY_ENCRYPTION_STRATEGY);
+  public void testPass_defaultCryptoService() {
+    m.put(Property.INSTANCE_CRYPTO_SERVICE.getKey(),
+        Property.INSTANCE_CRYPTO_SERVICE.getDefaultValue());
     ConfigSanityCheck.validate(m.entrySet());
   }
 }
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java
index 086e8b9..6088010 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java
@@ -19,12 +19,14 @@ package org.apache.accumulo.core.file.rfile;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.DefaultConfiguration;
 import org.apache.accumulo.core.data.ArrayByteSequence;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
+import org.apache.accumulo.core.security.crypto.CryptoServiceFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -56,8 +58,9 @@ public class CreateCompatTestFile {
   public static void main(String[] args) throws Exception {
     Configuration conf = new Configuration();
     FileSystem fs = FileSystem.get(conf);
-    BCFile.Writer _cbw = new BCFile.Writer(fs.create(new Path(args[0])), null, "gz", conf,
-        DefaultConfiguration.getInstance());
+    AccumuloConfiguration aconf = DefaultConfiguration.getInstance();
+    BCFile.Writer _cbw = new BCFile.Writer(fs.create(new Path(args[0])), null, "gz", conf, aconf,
+        CryptoServiceFactory.getConfigured(aconf));
     RFile.Writer writer = new RFile.Writer(_cbw, 1000);
 
     writer.startNewLocalityGroup("lg1",
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
index dd316a4..d1ff2a7 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
@@ -31,6 +31,7 @@ import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Reader.IndexIterator;
 import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Writer;
 import org.apache.accumulo.core.file.rfile.RFileTest.SeekableByteArrayInputStream;
 import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
+import org.apache.accumulo.core.security.crypto.CryptoServiceFactory;
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -56,7 +57,7 @@ public class MultiLevelIndexTest extends TestCase {
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     FSDataOutputStream dos = new FSDataOutputStream(baos, new FileSystem.Statistics("a"));
     BCFile.Writer _cbw = new BCFile.Writer(dos, null, "gz", CachedConfiguration.getInstance(),
-        aconf);
+        aconf, CryptoServiceFactory.getConfigured(aconf));
 
     BufferedWriter mliw = new BufferedWriter(new Writer(_cbw, maxBlockSize));
 
@@ -77,7 +78,7 @@ public class MultiLevelIndexTest extends TestCase {
     SeekableByteArrayInputStream bais = new SeekableByteArrayInputStream(data);
     FSDataInputStream in = new FSDataInputStream(bais);
     CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in, data.length,
-        CachedConfiguration.getInstance(), aconf);
+        CachedConfiguration.getInstance(), aconf, CryptoServiceFactory.getConfigured(aconf));
 
     Reader reader = new Reader(_cbr, RFile.RINDEX_VER_8);
     CachableBlockFile.CachedBlockRead rootIn = _cbr.getMetaBlock("root");
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
index ca19a1f..89ca96c 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
@@ -51,6 +51,7 @@ import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
 import org.apache.accumulo.core.sample.impl.SamplerFactory;
+import org.apache.accumulo.core.security.crypto.CryptoServiceFactory;
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.util.NamingThreadFactory;
 import org.apache.commons.lang.mutable.MutableInt;
@@ -147,7 +148,8 @@ public class MultiThreadedRFileTest {
       FileSystem fs = FileSystem.newInstance(conf);
       Path path = new Path("file://" + rfile);
       dos = fs.create(path, true);
-      BCFile.Writer _cbw = new BCFile.Writer(dos, null, "gz", conf, accumuloConfiguration);
+      BCFile.Writer _cbw = new BCFile.Writer(dos, null, "gz", conf, accumuloConfiguration,
+          CryptoServiceFactory.getConfigured(accumuloConfiguration));
       SamplerConfigurationImpl samplerConfig = SamplerConfigurationImpl
           .newSamplerConfig(accumuloConfiguration);
       Sampler sampler = null;
@@ -177,10 +179,11 @@ public class MultiThreadedRFileTest {
     public void openReader() throws IOException {
       FileSystem fs = FileSystem.newInstance(conf);
       Path path = new Path("file://" + rfile);
+      AccumuloConfiguration defaultConf = DefaultConfiguration.getInstance();
 
       // the caches used to obfuscate the multithreaded issues
       CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(fs, path, conf, null, null,
-          DefaultConfiguration.getInstance());
+          defaultConf, CryptoServiceFactory.getConfigured(defaultConf));
       reader = new RFile.Reader(_cbr);
       iter = new ColumnFamilySkippingIterator(reader);
 
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
index e95baba..51b3bdf 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
@@ -53,6 +53,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.ConfigurationCopy;
 import org.apache.accumulo.core.conf.DefaultConfiguration;
 import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.SiteConfiguration;
 import org.apache.accumulo.core.data.ArrayByteSequence;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
@@ -75,7 +76,10 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
 import org.apache.accumulo.core.sample.impl.SamplerFactory;
+import org.apache.accumulo.core.security.crypto.CryptoServiceFactory;
 import org.apache.accumulo.core.security.crypto.CryptoTest;
+import org.apache.accumulo.core.security.crypto.impl.AESCryptoService;
+import org.apache.accumulo.core.security.crypto.impl.NoCryptoService;
 import org.apache.accumulo.core.spi.cache.BlockCacheManager;
 import org.apache.accumulo.core.spi.cache.CacheType;
 import org.apache.accumulo.core.util.CachedConfiguration;
@@ -86,7 +90,9 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.PositionedReadable;
 import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.io.Text;
+import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -123,6 +129,16 @@ public class RFileTest {
   public TemporaryFolder tempFolder = new TemporaryFolder(
       new File(System.getProperty("user.dir") + "/target"));
 
+  @BeforeClass
+  public static void setupCryptoKeyFile() throws Exception {
+    CryptoTest.setupKeyFile();
+  }
+
+  @AfterClass
+  public static void removeCryptoKeyFile() throws Exception {
+    CryptoTest.cleanupKeyFile();
+  }
+
   static class SeekableByteArrayInputStream extends ByteArrayInputStream
       implements Seekable, PositionedReadable {
 
@@ -233,7 +249,8 @@ public class RFileTest {
     public void openWriter(boolean startDLG, int blockSize) throws IOException {
       baos = new ByteArrayOutputStream();
       dos = new FSDataOutputStream(baos, new FileSystem.Statistics("a"));
-      BCFile.Writer _cbw = new BCFile.Writer(dos, null, "gz", conf, accumuloConfiguration);
+      BCFile.Writer _cbw = new BCFile.Writer(dos, null, "gz", conf, accumuloConfiguration,
+          CryptoServiceFactory.getConfigured(accumuloConfiguration));
 
       SamplerConfigurationImpl samplerConfig = SamplerConfigurationImpl
           .newSamplerConfig(accumuloConfiguration);
@@ -295,7 +312,8 @@ public class RFileTest {
       LruBlockCache dataCache = (LruBlockCache) manager.getBlockCache(CacheType.DATA);
 
       CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader("source-1", in, fileLength, conf,
-          dataCache, indexCache, DefaultConfiguration.getInstance());
+          dataCache, indexCache, accumuloConfiguration,
+          CryptoServiceFactory.getConfigured(accumuloConfiguration));
       reader = new RFile.Reader(_cbr);
       if (cfsi)
         iter = new ColumnFamilySkippingIterator(reader);
@@ -1691,18 +1709,30 @@ public class RFileTest {
 
   @Test(expected = NullPointerException.class)
   public void testMissingUnreleasedVersions() throws Exception {
-    runVersionTest(5);
+    runVersionTest(5, DefaultConfiguration.getInstance());
   }
 
   @Test
   public void testOldVersions() throws Exception {
-    runVersionTest(3);
-    runVersionTest(4);
-    runVersionTest(6);
-    runVersionTest(7);
+    AccumuloConfiguration defaultConfiguration = DefaultConfiguration.getInstance();
+    runVersionTest(3, defaultConfiguration);
+    runVersionTest(4, defaultConfiguration);
+    runVersionTest(6, defaultConfiguration);
+    runVersionTest(7, defaultConfiguration);
+  }
+
+  @Test
+  public void testOldVersionsWithCrypto() throws Exception {
+    turnCryptoOnInSiteConfig();
+    AccumuloConfiguration cryptoOnConf = setAndGetAccumuloConfig(CryptoTest.CRYPTO_ON_CONF);
+    runVersionTest(3, cryptoOnConf);
+    runVersionTest(4, cryptoOnConf);
+    runVersionTest(6, cryptoOnConf);
+    runVersionTest(7, cryptoOnConf);
+    turnCryptoOffInSiteConfig();
   }
 
-  private void runVersionTest(int version) throws IOException {
+  private void runVersionTest(int version, AccumuloConfiguration aconf) throws IOException {
     InputStream in = this.getClass().getClassLoader()
         .getResourceAsStream("org/apache/accumulo/core/file/rfile/ver_" + version + ".rf");
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -1714,9 +1744,8 @@ public class RFileTest {
     byte data[] = baos.toByteArray();
     SeekableByteArrayInputStream bais = new SeekableByteArrayInputStream(data);
     FSDataInputStream in2 = new FSDataInputStream(bais);
-    AccumuloConfiguration aconf = DefaultConfiguration.getInstance();
     CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in2, data.length,
-        CachedConfiguration.getInstance(), aconf);
+        CachedConfiguration.getInstance(), aconf, CryptoServiceFactory.getConfigured(aconf));
     Reader reader = new RFile.Reader(_cbr);
     checkIndex(reader);
 
@@ -1762,7 +1791,7 @@ public class RFileTest {
     reader.close();
   }
 
-  private AccumuloConfiguration setAndGetAccumuloConfig(String cryptoConfSetting) {
+  public static AccumuloConfiguration setAndGetAccumuloConfig(String cryptoConfSetting) {
     ConfigurationCopy result = new ConfigurationCopy(DefaultConfiguration.getInstance());
     Configuration conf = new Configuration(false);
     conf.addResource(cryptoConfSetting);
@@ -1772,15 +1801,33 @@ public class RFileTest {
     return result;
   }
 
+  public void turnCryptoOnInSiteConfig() {
+    SiteConfiguration.clearInstance();
+    SiteConfiguration.getInstance().set(Property.INSTANCE_CRYPTO_SERVICE,
+        AESCryptoService.class.getName());
+    SiteConfiguration.getInstance().set("instance.crypto.opts.kekId", "file:///tmp/testAESFile");
+    SiteConfiguration.getInstance().set("instance.crypto.opts.keyManager", "uri");
+    CryptoServiceFactory.resetInstance();
+  }
+
+  public static void turnCryptoOffInSiteConfig() {
+    SiteConfiguration.getInstance().set(Property.INSTANCE_CRYPTO_SERVICE,
+        NoCryptoService.class.getName());
+    CryptoServiceFactory.resetInstance();
+  }
+
   @Test
   public void testEncRFile1() throws Exception {
+    turnCryptoOnInSiteConfig();
     conf = setAndGetAccumuloConfig(CryptoTest.CRYPTO_ON_CONF);
     test1();
     conf = null;
+    turnCryptoOffInSiteConfig();
   }
 
   @Test
   public void testEncRFile2() throws Exception {
+    turnCryptoOnInSiteConfig();
     conf = setAndGetAccumuloConfig(CryptoTest.CRYPTO_ON_CONF);
     test2();
     conf = null;
@@ -1788,115 +1835,149 @@ public class RFileTest {
 
   @Test
   public void testEncRFile3() throws Exception {
+    turnCryptoOnInSiteConfig();
     conf = setAndGetAccumuloConfig(CryptoTest.CRYPTO_ON_CONF);
     test3();
     conf = null;
+    turnCryptoOffInSiteConfig();
   }
 
   @Test
   public void testEncRFile4() throws Exception {
+    turnCryptoOnInSiteConfig();
     conf = setAndGetAccumuloConfig(CryptoTest.CRYPTO_ON_CONF);
     test4();
     conf = null;
+    turnCryptoOffInSiteConfig();
   }
 
   @Test
   public void testEncRFile5() throws Exception {
+    turnCryptoOnInSiteConfig();
     conf = setAndGetAccumuloConfig(CryptoTest.CRYPTO_ON_CONF);
     test5();
     conf = null;
+    turnCryptoOffInSiteConfig();
   }
 
   @Test
   public void testEncRFile6() throws Exception {
+    turnCryptoOnInSiteConfig();
     conf = setAndGetAccumuloConfig(CryptoTest.CRYPTO_ON_CONF);
     test6();
     conf = null;
+    turnCryptoOffInSiteConfig();
   }
 
   @Test
   public void testEncRFile7() throws Exception {
+    turnCryptoOnInSiteConfig();
     conf = setAndGetAccumuloConfig(CryptoTest.CRYPTO_ON_CONF);
     test7();
     conf = null;
+    turnCryptoOffInSiteConfig();
   }
 
   @Test
   public void testEncRFile8() throws Exception {
+    turnCryptoOnInSiteConfig();
     conf = setAndGetAccumuloConfig(CryptoTest.CRYPTO_ON_CONF);
     test8();
     conf = null;
+    turnCryptoOffInSiteConfig();
   }
 
   @Test
   public void testEncRFile9() throws Exception {
+    turnCryptoOnInSiteConfig();
     conf = setAndGetAccumuloConfig(CryptoTest.CRYPTO_ON_CONF);
     test9();
     conf = null;
+    turnCryptoOffInSiteConfig();
   }
 
   @Test
   public void testEncRFile10() throws Exception {
+    turnCryptoOnInSiteConfig();
     conf = setAndGetAccumuloConfig(CryptoTest.CRYPTO_ON_CONF);
     test10();
     conf = null;
+    turnCryptoOffInSiteConfig();
   }
 
   @Test
   public void testEncRFile11() throws Exception {
+    turnCryptoOnInSiteConfig();
     conf = setAndGetAccumuloConfig(CryptoTest.CRYPTO_ON_CONF);
     test11();
     conf = null;
+    turnCryptoOffInSiteConfig();
   }
 
   @Test
   public void testEncRFile12() throws Exception {
+    turnCryptoOnInSiteConfig();
+    conf = setAndGetAccumuloConfig(CryptoTest.CRYPTO_ON_CONF);
     test12();
     conf = null;
+    turnCryptoOffInSiteConfig();
   }
 
   @Test
   public void testEncRFile13() throws Exception {
+    turnCryptoOnInSiteConfig();
     conf = setAndGetAccumuloConfig(CryptoTest.CRYPTO_ON_CONF);
     test13();
     conf = null;
+    turnCryptoOffInSiteConfig();
   }
 
   @Test
   public void testEncRFile14() throws Exception {
+    turnCryptoOnInSiteConfig();
     conf = setAndGetAccumuloConfig(CryptoTest.CRYPTO_ON_CONF);
     test14();
     conf = null;
+    turnCryptoOffInSiteConfig();
   }
 
   @Test
   public void testEncRFile16() throws Exception {
+    turnCryptoOnInSiteConfig();
     conf = setAndGetAccumuloConfig(CryptoTest.CRYPTO_ON_CONF);
     test16();
+    turnCryptoOffInSiteConfig();
   }
 
   @Test
   public void testEncRFile17() throws Exception {
+    turnCryptoOnInSiteConfig();
     conf = setAndGetAccumuloConfig(CryptoTest.CRYPTO_ON_CONF);
     test17();
+    turnCryptoOffInSiteConfig();
   }
 
   @Test
   public void testEncRFile18() throws Exception {
+    turnCryptoOnInSiteConfig();
     conf = setAndGetAccumuloConfig(CryptoTest.CRYPTO_ON_CONF);
     test18();
     conf = null;
+    turnCryptoOffInSiteConfig();
   }
 
   @Test
   public void testEncRFile19() throws Exception {
+    turnCryptoOnInSiteConfig();
     conf = setAndGetAccumuloConfig(CryptoTest.CRYPTO_ON_CONF);
     test19();
     conf = null;
+    turnCryptoOffInSiteConfig();
   }
 
   @Test
   public void testEncryptedRFiles() throws Exception {
+    turnCryptoOnInSiteConfig();
     conf = setAndGetAccumuloConfig(CryptoTest.CRYPTO_ON_CONF);
     test1();
     test2();
@@ -1907,6 +1988,7 @@ public class RFileTest {
     test7();
     test8();
     conf = null;
+    turnCryptoOffInSiteConfig();
   }
 
   private Key newKey(int r, int c) {
@@ -2200,10 +2282,12 @@ public class RFileTest {
 
   @Test
   public void testEncSample() throws IOException {
+    turnCryptoOnInSiteConfig();
     conf = setAndGetAccumuloConfig(CryptoTest.CRYPTO_ON_CONF);
     testSample();
     testSampleLG();
     conf = null;
+    turnCryptoOffInSiteConfig();
   }
 
   @Test
@@ -2258,6 +2342,7 @@ public class RFileTest {
 
   @Test
   public void testCryptoDoesntLeakSensitive() throws IOException {
+    turnCryptoOnInSiteConfig();
     conf = setAndGetAccumuloConfig(CryptoTest.CRYPTO_ON_CONF);
     // test an empty file
 
@@ -2275,10 +2360,12 @@ public class RFileTest {
         assertEquals(-1, Bytes.indexOf(rfBytes, toCheck));
       }
     }
+    turnCryptoOffInSiteConfig();
   }
 
   @Test
   public void testRootTabletEncryption() throws Exception {
+    turnCryptoOnInSiteConfig();
 
     // This tests that the normal set of operations used to populate a root tablet
     conf = setAndGetAccumuloConfig(CryptoTest.CRYPTO_ON_CONF);
@@ -2362,5 +2449,6 @@ public class RFileTest {
     testRfile.closeReader();
 
     conf = null;
+    turnCryptoOffInSiteConfig();
   }
 }
diff --git a/core/src/test/java/org/apache/accumulo/core/security/crypto/BlockedIOStreamTest.java b/core/src/test/java/org/apache/accumulo/core/security/crypto/BlockedIOStreamTest.java
index 0e1b218..ef1af9f 100644
--- a/core/src/test/java/org/apache/accumulo/core/security/crypto/BlockedIOStreamTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/security/crypto/BlockedIOStreamTest.java
@@ -27,6 +27,8 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Random;
 
+import org.apache.accumulo.core.security.crypto.streams.BlockedInputStream;
+import org.apache.accumulo.core.security.crypto.streams.BlockedOutputStream;
 import org.junit.Test;
 
 public class BlockedIOStreamTest {
diff --git a/core/src/test/java/org/apache/accumulo/core/security/crypto/CryptoTest.java b/core/src/test/java/org/apache/accumulo/core/security/crypto/CryptoTest.java
index 465162f..b99144a 100644
--- a/core/src/test/java/org/apache/accumulo/core/security/crypto/CryptoTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/security/crypto/CryptoTest.java
@@ -17,652 +17,384 @@
 
 package org.apache.accumulo.core.security.crypto;
 
-import static org.junit.Assert.assertArrayEquals;
+import static org.apache.accumulo.core.file.rfile.RFileTest.setAndGetAccumuloConfig;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertTrue;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
+import java.io.OutputStream;
 import java.security.InvalidKeyException;
-import java.security.Key;
 import java.security.NoSuchAlgorithmException;
 import java.security.NoSuchProviderException;
 import java.security.SecureRandom;
+import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Map.Entry;
-import java.util.Random;
+import java.util.Map;
 
-import javax.crypto.AEADBadTagException;
-import javax.crypto.BadPaddingException;
 import javax.crypto.Cipher;
-import javax.crypto.IllegalBlockSizeException;
 import javax.crypto.NoSuchPaddingException;
 import javax.crypto.spec.SecretKeySpec;
 
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.rfile.RFile;
+import org.apache.accumulo.core.client.rfile.RFileWriter;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.ConfigurationCopy;
 import org.apache.accumulo.core.conf.DefaultConfiguration;
 import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.crypto.impl.AESCryptoService;
+import org.apache.accumulo.core.security.crypto.impl.CryptoEnvironmentImpl;
+import org.apache.accumulo.core.security.crypto.impl.KeyManager;
+import org.apache.accumulo.core.security.crypto.impl.NoCryptoService;
+import org.apache.accumulo.core.security.crypto.streams.NoFlushOutputStream;
+import org.apache.accumulo.core.spi.crypto.CryptoEnvironment;
+import org.apache.accumulo.core.spi.crypto.CryptoEnvironment.Scope;
+import org.apache.accumulo.core.spi.crypto.CryptoService;
+import org.apache.accumulo.core.spi.crypto.FileDecrypter;
+import org.apache.accumulo.core.spi.crypto.FileEncrypter;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
-import com.google.common.primitives.Bytes;
-
 public class CryptoTest {
 
-  private static final int MARKER_INT = 0xCADEFEDD;
-  private static final String MARKER_STRING = "1 2 3 a b c";
+  public static final int MARKER_INT = 0xCADEFEDD;
+  public static final String MARKER_STRING = "1 2 3 4 5 6 7 8 a b c d e f g h ";
   public static final String CRYPTO_ON_CONF = "crypto-on-accumulo-site.xml";
   public static final String CRYPTO_OFF_CONF = "crypto-off-accumulo-site.xml";
-  // @formatter:off
-  public static final String CRYPTO_ON_KEK_OFF_CONF =
-    "crypto-on-no-key-encryption-accumulo-site.xml";
-  // @formatter:on
-
-  // Used for kek file testing
-  private static File kekWorks;
-  private static File kekTooLong;
-  private static File kekTooShort;
 
   @Rule
   public ExpectedException exception = ExpectedException.none();
 
-  @Test
-  public void testNoCryptoStream() throws IOException {
-    AccumuloConfiguration conf = setAndGetAccumuloConfig(CRYPTO_OFF_CONF);
-
-    CryptoModuleParameters params = CryptoModuleFactory
-        .createParamsObjectFromAccumuloConfiguration(conf);
-
-    assertNotNull(params);
-    assertEquals("NullCipher", params.getCipherSuite());
-
-    CryptoModule cryptoModule = CryptoModuleFactory.getCryptoModule(conf);
-    assertNotNull(cryptoModule);
-    assertTrue(cryptoModule instanceof CryptoModuleFactory.NullCryptoModule);
-
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-
-    params.setPlaintextOutputStream(out);
-
-    params = cryptoModule.getEncryptingOutputStream(params);
-    assertNotNull(params.getEncryptedOutputStream());
-    assertEquals(out, params.getEncryptedOutputStream());
+  @BeforeClass
+  public static void setupKeyFile() throws Exception {
+    FileSystem fs = FileSystem.getLocal(CachedConfiguration.getInstance());
+    String file = "/tmp/testAESFile";
+    Path aesPath = new Path(file);
+    fs.delete(aesPath, true);
+    fs.createNewFile(aesPath);
+    try (FSDataOutputStream out = fs.create(aesPath)) {
+      out.writeUTF("sixteenbytekey"); // 14 + 2 from writeUTF
+    }
   }
 
-  @Test
-  public void testCryptoModuleParamsParsing() {
-    AccumuloConfiguration conf = setAndGetAccumuloConfig(CRYPTO_ON_CONF);
-
-    CryptoModuleParameters params = CryptoModuleFactory
-        .createParamsObjectFromAccumuloConfiguration(conf);
-
-    assertNotNull(params);
-    assertEquals("AES/GCM/NoPadding", params.getCipherSuite());
-    assertEquals("AES/CBC/NoPadding",
-        params.getAllOptions().get(Property.CRYPTO_WAL_CIPHER_SUITE.getKey()));
-    assertEquals("GCM", params.getCipherSuiteEncryptionMode());
-    assertEquals("AES", params.getKeyAlgorithmName());
-    assertEquals(128, params.getKeyLength());
-    assertEquals("SHA1PRNG", params.getRandomNumberGenerator());
-    assertEquals("SUN", params.getRandomNumberGeneratorProvider());
-    assertEquals("SunJCE", params.getSecurityProvider());
-    assertEquals("org.apache.accumulo.core.security.crypto.CachingHDFSSecretKeyEncryptionStrategy",
-        params.getKeyEncryptionStrategyClass());
+  @AfterClass
+  public static void cleanupKeyFile() throws Exception {
+    FileSystem fs = FileSystem.getLocal(CachedConfiguration.getInstance());
+    String file = "/tmp/testAESFile";
+    Path aesPath = new Path(file);
+    fs.delete(aesPath, true);
   }
 
-  @Test
-  public void testCryptoModuleDoesntLeakSensitive() throws IOException {
-    AccumuloConfiguration conf = setAndGetAccumuloConfig(CRYPTO_ON_CONF);
-
-    CryptoModuleParameters params = CryptoModuleFactory
-        .createParamsObjectFromAccumuloConfiguration(conf);
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    params.setPlaintextOutputStream(baos);
-
-    CryptoModule cryptoModule = CryptoModuleFactory.getCryptoModule(conf);
-
-    cryptoModule.getEncryptingOutputStream(params);
-    params.getEncryptedOutputStream().close();
-
-    // If we get here, we have encrypted bytes
-    byte[] streamBytes = baos.toByteArray();
-    for (Property prop : Property.values()) {
-      if (prop.isSensitive()) {
-        byte[] toCheck = prop.getKey().getBytes();
-        assertEquals(-1, Bytes.indexOf(streamBytes, toCheck));
-      }
-    }
-
+  @Before
+  public void turnCryptoOnInSiteConfig() {
+    SiteConfiguration.getInstance().set(Property.INSTANCE_CRYPTO_SERVICE,
+        AESCryptoService.class.getName());
+    SiteConfiguration.getInstance().set("instance.crypto.opts.kekId", "file:///tmp/testAESFile");
+    SiteConfiguration.getInstance().set("instance.crypto.opts.keyManager", "uri");
+    CryptoServiceFactory.resetInstance();
   }
 
-  @Test
-  public void testCryptoModuleParamsValidation1() throws IOException {
-    AccumuloConfiguration conf = setAndGetAccumuloConfig(CRYPTO_ON_CONF);
-
-    CryptoModuleParameters params = CryptoModuleFactory
-        .createParamsObjectFromAccumuloConfiguration(conf);
-    CryptoModule cryptoModule = CryptoModuleFactory.getCryptoModule(conf);
-
-    assertTrue(cryptoModule instanceof DefaultCryptoModule);
-
-    exception.expect(RuntimeException.class);
-    cryptoModule.getEncryptingOutputStream(params);
+  public static void turnCryptoOffInSiteConfig() {
+    SiteConfiguration.getInstance().set(Property.INSTANCE_CRYPTO_SERVICE,
+        NoCryptoService.class.getName());
+    CryptoServiceFactory.resetInstance();
   }
 
   @Test
-  public void testCryptoModuleParamsValidation2() throws IOException {
+  public void simpleGCMTest() throws Exception {
     AccumuloConfiguration conf = setAndGetAccumuloConfig(CRYPTO_ON_CONF);
 
-    CryptoModuleParameters params = CryptoModuleFactory
-        .createParamsObjectFromAccumuloConfiguration(conf);
-    CryptoModule cryptoModule = CryptoModuleFactory.getCryptoModule(conf);
-
-    assertTrue(cryptoModule instanceof DefaultCryptoModule);
-
-    exception.expect(RuntimeException.class);
-    cryptoModule.getDecryptingInputStream(params);
-  }
+    CryptoService cryptoService = CryptoServiceFactory.getConfigured(conf);
+    CryptoEnvironment encEnv = new CryptoEnvironmentImpl(Scope.RFILE, null);
+    FileEncrypter encrypter = cryptoService.getFileEncrypter(encEnv);
+    byte[] params = encrypter.getDecryptionParameters();
+    assertNotNull(params);
 
-  private String getStringifiedBytes(String s) throws IOException {
     ByteArrayOutputStream out = new ByteArrayOutputStream();
     DataOutputStream dataOut = new DataOutputStream(out);
+    CryptoUtils.writeParams(params, dataOut);
+    OutputStream encrypted = encrypter.encryptStream(dataOut);
 
-    dataOut.writeUTF(s);
-    dataOut.close();
-    byte[] stringMarkerBytes = out.toByteArray();
-    return Arrays.toString(stringMarkerBytes);
-
-  }
+    assertNotNull(encrypted);
+    DataOutputStream cipherOut = new DataOutputStream(encrypted);
 
-  private String getStringifiedBytes(int i) throws IOException {
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-    DataOutputStream dataOut = new DataOutputStream(out);
+    cipherOut.writeUTF(MARKER_STRING);
 
-    dataOut.writeInt(i);
+    cipherOut.close();
     dataOut.close();
-    byte[] stringMarkerBytes = out.toByteArray();
-    return Arrays.toString(stringMarkerBytes);
-
+    encrypted.close();
+    out.close();
+
+    byte[] cipherText = out.toByteArray();
+
+    // decrypt
+    ByteArrayInputStream in = new ByteArrayInputStream(cipherText);
+    params = CryptoUtils.readParams(new DataInputStream(in));
+    CryptoEnvironment decEnv = new CryptoEnvironmentImpl(Scope.RFILE, params);
+    FileDecrypter decrypter = cryptoService.getFileDecrypter(decEnv);
+    DataInputStream decrypted = new DataInputStream(decrypter.decryptStream(in));
+    String plainText = decrypted.readUTF();
+    decrypted.close();
+    in.close();
+
+    assertEquals(MARKER_STRING, new String(plainText));
   }
 
   @Test
-  public void testCryptoModuleBasicReadWrite() throws IOException {
-    AccumuloConfiguration conf = setAndGetAccumuloConfig(CRYPTO_ON_KEK_OFF_CONF);
-
-    CryptoModule cryptoModule = CryptoModuleFactory.getCryptoModule(conf);
-    CryptoModuleParameters params = CryptoModuleFactory
-        .createParamsObjectFromAccumuloConfiguration(conf);
+  public void testAESCryptoServiceWAL() throws Exception {
+    AESCryptoService cs = new AESCryptoService();
+    byte[] resultingBytes = encrypt(cs, Scope.WAL, CRYPTO_ON_CONF);
 
-    assertTrue(cryptoModule instanceof DefaultCryptoModule);
-
-    byte[] resultingBytes = setUpSampleEncryptedBytes(cryptoModule, params);
-
-    // If we get here, we have encrypted bytes
-    ByteArrayInputStream in = new ByteArrayInputStream(resultingBytes);
-
-    params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf);
-    params.setEncryptedInputStream(in);
-
-    params = cryptoModule.getDecryptingInputStream(params);
-
-    InputStream plaintextIn = params.getPlaintextInputStream();
+    String stringifiedBytes = Arrays.toString(resultingBytes);
+    String stringifiedMarkerBytes = getStringifiedBytes(null, MARKER_STRING, MARKER_INT);
 
-    assertNotNull(plaintextIn);
-    assertNotSame(plaintextIn, in);
-    DataInputStream dataIn = new DataInputStream(plaintextIn);
-    String markerString = dataIn.readUTF();
-    int markerInt = dataIn.readInt();
+    assertNotEquals(stringifiedBytes, stringifiedMarkerBytes);
 
-    assertEquals(MARKER_STRING, markerString);
-    assertEquals(MARKER_INT, markerInt);
+    decrypt(resultingBytes, Scope.WAL, CRYPTO_ON_CONF);
   }
 
-  private byte[] setUpSampleEncryptedBytes(CryptoModule cryptoModule, CryptoModuleParameters params)
-      throws IOException {
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-
-    params.setPlaintextOutputStream(new NoFlushOutputStream(out));
-
-    params = cryptoModule.getEncryptingOutputStream(params);
-
-    assertNotNull(params.getEncryptedOutputStream());
-    assertNotSame(params.getEncryptedOutputStream(), out);
-
-    DataOutputStream dataOut = new DataOutputStream(params.getEncryptedOutputStream());
-    dataOut.writeUTF(MARKER_STRING);
-    dataOut.writeInt(MARKER_INT);
-    dataOut.close();
+  @Test
+  public void testAESCryptoServiceRFILE() throws Exception {
+    AESCryptoService cs = new AESCryptoService();
+    byte[] resultingBytes = encrypt(cs, Scope.RFILE, CRYPTO_ON_CONF);
 
-    byte[] resultingBytes = out.toByteArray();
     String stringifiedBytes = Arrays.toString(resultingBytes);
+    String stringifiedMarkerBytes = getStringifiedBytes(null, MARKER_STRING, MARKER_INT);
 
-    String stringifiedMarkerBytes = getStringifiedBytes(MARKER_STRING);
-    String stringifiedOtherBytes = getStringifiedBytes(MARKER_INT);
+    assertNotEquals(stringifiedBytes, stringifiedMarkerBytes);
 
-    // OK, let's make sure it's encrypted
-    assertFalse(stringifiedBytes.contains(stringifiedMarkerBytes));
-    assertFalse(stringifiedBytes.contains(stringifiedOtherBytes));
-    return resultingBytes;
+    decrypt(resultingBytes, Scope.RFILE, CRYPTO_ON_CONF);
   }
 
   @Test
-  public void testKeyEncryptionAndCheckThatFileCannotBeReadWithoutKEK() throws IOException {
-    AccumuloConfiguration conf = setAndGetAccumuloConfig(CRYPTO_ON_CONF);
-
-    CryptoModule cryptoModule = CryptoModuleFactory.getCryptoModule(conf);
-    CryptoModuleParameters params = CryptoModuleFactory
-        .createParamsObjectFromAccumuloConfiguration(conf);
-
-    // CRYPTO_ON_CONF uses AESWrap which produces wrapped keys that are too large and require a
-    // change to
-    // JCE Unlimited Strength Jurisdiction. Using AES/ECB/NoPadding should avoid this problem.
-    params.getAllOptions().put(Property.CRYPTO_DEFAULT_KEY_STRATEGY_CIPHER_SUITE.getKey(),
-        "AES/ECB/NoPadding");
-    assertTrue(cryptoModule instanceof DefaultCryptoModule);
-    assertNotNull(params.getKeyEncryptionStrategyClass());
-    assertEquals("org.apache.accumulo.core.security.crypto.CachingHDFSSecretKeyEncryptionStrategy",
-        params.getKeyEncryptionStrategyClass());
-
-    byte[] resultingBytes = setUpSampleEncryptedBytes(cryptoModule, params);
-
-    // So now that we have bytes encrypted by a key encrypted to a KEK, turn off the KEK
-    // configuration and try
-    // to decrypt. We expect this to fail. This also tests our ability to override the key
-    // encryption strategy.
-    conf = setAndGetAccumuloConfig(CRYPTO_ON_KEK_OFF_CONF);
-    params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf);
-    params.setOverrideStreamsSecretKeyEncryptionStrategy(true);
+  public void testNoEncryptionWAL() throws Exception {
+    NoCryptoService cs = new NoCryptoService();
+    turnCryptoOffInSiteConfig();
+    byte[] encryptedBytes = encrypt(cs, Scope.WAL, CRYPTO_OFF_CONF);
 
-    ByteArrayInputStream in = new ByteArrayInputStream(resultingBytes);
-    params.setEncryptedInputStream(in);
+    String stringifiedBytes = Arrays.toString(encryptedBytes);
+    String stringifiedMarkerBytes = getStringifiedBytes("U+1F47B".getBytes(), MARKER_STRING,
+        MARKER_INT);
 
-    params = cryptoModule.getDecryptingInputStream(params);
+    assertEquals(stringifiedBytes, stringifiedMarkerBytes);
 
-    assertNotNull(params.getPlaintextInputStream());
-    DataInputStream dataIn = new DataInputStream(params.getPlaintextInputStream());
-    // We expect the following operation to fail and throw an exception
-    exception.expect(IOException.class);
-    @SuppressWarnings("unused")
-    String markerString = dataIn.readUTF();
+    decrypt(encryptedBytes, Scope.WAL, CRYPTO_OFF_CONF);
   }
 
   @Test
-  public void testKeyEncryptionNormalPath() throws IOException {
-    AccumuloConfiguration conf = setAndGetAccumuloConfig(CRYPTO_ON_CONF);
-
-    CryptoModule cryptoModule = CryptoModuleFactory.getCryptoModule(conf);
-    CryptoModuleParameters params = CryptoModuleFactory
-        .createParamsObjectFromAccumuloConfiguration(conf);
+  public void testNoEncryptionRFILE() throws Exception {
+    NoCryptoService cs = new NoCryptoService();
+    turnCryptoOffInSiteConfig();
+    byte[] encryptedBytes = encrypt(cs, Scope.RFILE, CRYPTO_OFF_CONF);
 
-    assertTrue(cryptoModule instanceof DefaultCryptoModule);
-    assertNotNull(params.getKeyEncryptionStrategyClass());
-    assertEquals("org.apache.accumulo.core.security.crypto.CachingHDFSSecretKeyEncryptionStrategy",
-        params.getKeyEncryptionStrategyClass());
-
-    byte[] resultingBytes = setUpSampleEncryptedBytes(cryptoModule, params);
-
-    params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf);
-    params.setOverrideStreamsSecretKeyEncryptionStrategy(true);
-
-    ByteArrayInputStream in = new ByteArrayInputStream(resultingBytes);
-    params.setEncryptedInputStream(in);
+    String stringifiedBytes = Arrays.toString(encryptedBytes);
+    String stringifiedMarkerBytes = getStringifiedBytes("U+1F47B".getBytes(), MARKER_STRING,
+        MARKER_INT);
 
-    params = cryptoModule.getDecryptingInputStream(params);
+    assertEquals(stringifiedBytes, stringifiedMarkerBytes);
 
-    assertNotNull(params.getPlaintextInputStream());
-    DataInputStream dataIn = new DataInputStream(params.getPlaintextInputStream());
-
-    String markerString = dataIn.readUTF();
-    int markerInt = dataIn.readInt();
-
-    assertEquals(MARKER_STRING, markerString);
-    assertEquals(MARKER_INT, markerInt);
+    decrypt(encryptedBytes, Scope.RFILE, CRYPTO_OFF_CONF);
   }
 
   @Test
-  public void testChangingCryptoParamsAndCanStillDecryptPreviouslyEncryptedFiles()
-      throws IOException {
-    AccumuloConfiguration conf = setAndGetAccumuloConfig(CRYPTO_ON_CONF);
-
-    CryptoModule cryptoModule = CryptoModuleFactory.getCryptoModule(conf);
-    CryptoModuleParameters params = CryptoModuleFactory
-        .createParamsObjectFromAccumuloConfiguration(conf);
-
-    assertTrue(cryptoModule instanceof DefaultCryptoModule);
-    assertNotNull(params.getKeyEncryptionStrategyClass());
-    assertEquals("org.apache.accumulo.core.security.crypto.CachingHDFSSecretKeyEncryptionStrategy",
-        params.getKeyEncryptionStrategyClass());
-
-    byte[] resultingBytes = setUpSampleEncryptedBytes(cryptoModule, params);
-
-    // Now we're going to create a params object and set its algorithm and key length different
-    // from those configured within the site configuration. After doing this, we should
-    // still be able to read the file that was created with a different set of parameters.
-    params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf);
-    params.setKeyAlgorithmName("DESede");
-    params.setKeyLength(24 * 8);
-
-    ByteArrayInputStream in = new ByteArrayInputStream(resultingBytes);
-    params.setEncryptedInputStream(in);
+  public void testRFileEncrypted() throws Exception {
+    AccumuloConfiguration cryptoOnConf = setAndGetAccumuloConfig(CRYPTO_ON_CONF);
+    FileSystem fs = FileSystem.getLocal(CachedConfiguration.getInstance());
+    ArrayList<Key> keys = testData();
+
+    String file = "target/testFile1.rf";
+    fs.delete(new Path(file), true);
+    try (RFileWriter writer = RFile.newWriter().to(file).withFileSystem(fs)
+        .withTableProperties(cryptoOnConf).build()) {
+      Value empty = new Value(new byte[] {});
+      writer.startDefaultLocalityGroup();
+      for (Key key : keys) {
+        writer.append(key, empty);
+      }
+    }
 
-    params = cryptoModule.getDecryptingInputStream(params);
+    Scanner iter = RFile.newScanner().from(file).withFileSystem(fs)
+        .withTableProperties(cryptoOnConf).build();
+    ArrayList<Key> keysRead = new ArrayList<>();
+    iter.forEach(e -> keysRead.add(e.getKey()));
+    assertEquals(keys, keysRead);
+  }
 
-    assertNotNull(params.getPlaintextInputStream());
-    DataInputStream dataIn = new DataInputStream(params.getPlaintextInputStream());
-    String markerString = dataIn.readUTF();
-    int markerInt = dataIn.readInt();
+  @Test
+  // This test is to ensure when Crypto is configured that it can read unencrypted files
+  public void testReadNoCryptoWithCryptoConfigured() throws Exception {
+    AccumuloConfiguration cryptoOffConf = setAndGetAccumuloConfig(CRYPTO_OFF_CONF);
+    AccumuloConfiguration cryptoOnConf = setAndGetAccumuloConfig(CRYPTO_ON_CONF);
+    FileSystem fs = FileSystem.getLocal(CachedConfiguration.getInstance());
+    ArrayList<Key> keys = testData();
+
+    turnCryptoOffInSiteConfig();
+    String file = "target/testFile2.rf";
+    fs.delete(new Path(file), true);
+    try (RFileWriter writer = RFile.newWriter().to(file).withFileSystem(fs)
+        .withTableProperties(cryptoOffConf).build()) {
+      Value empty = new Value(new byte[] {});
+      writer.startDefaultLocalityGroup();
+      for (Key key : keys) {
+        writer.append(key, empty);
+      }
+    }
 
-    assertEquals(MARKER_STRING, markerString);
-    assertEquals(MARKER_INT, markerInt);
+    turnCryptoOnInSiteConfig();
+    CryptoServiceFactory.resetInstance();
+    Scanner iter = RFile.newScanner().from(file).withFileSystem(fs)
+        .withTableProperties(cryptoOnConf).build();
+    ArrayList<Key> keysRead = new ArrayList<>();
+    iter.forEach(e -> keysRead.add(e.getKey()));
+    assertEquals(keys, keysRead);
   }
 
-  private AccumuloConfiguration setAndGetAccumuloConfig(String cryptoConfSetting) {
-    ConfigurationCopy result = new ConfigurationCopy(DefaultConfiguration.getInstance());
+  @Test
+  public void testMissingConfigProperties()
+      throws ClassNotFoundException, InstantiationException, IllegalAccessException {
+    ConfigurationCopy aconf = new ConfigurationCopy(DefaultConfiguration.getInstance());
     Configuration conf = new Configuration(false);
-    conf.addResource(cryptoConfSetting);
-    for (Entry<String,String> e : conf) {
-      result.set(e.getKey(), e.getValue());
+    for (Map.Entry<String,String> e : conf) {
+      aconf.set(e.getKey(), e.getValue());
     }
-    return result;
+    aconf.set(Property.INSTANCE_CRYPTO_SERVICE,
+        "org.apache.accumulo.core.security.crypto.impl.AESCryptoService");
+    String configuredClass = aconf.get(Property.INSTANCE_CRYPTO_SERVICE.getKey());
+    Class<? extends CryptoService> clazz = AccumuloVFSClassLoader.loadClass(configuredClass,
+        CryptoService.class);
+    CryptoService cs = clazz.newInstance();
+
+    exception.expect(NullPointerException.class);
+    cs.init(aconf.getAllPropertiesWithPrefix(Property.TABLE_PREFIX));
+    assertEquals(AESCryptoService.class, cs.getClass());
   }
 
   @Test
-  public void testKeyWrapAndUnwrap() throws NoSuchAlgorithmException, NoSuchPaddingException,
-      NoSuchProviderException, InvalidKeyException, IllegalBlockSizeException, BadPaddingException {
-    Cipher keyWrapCipher = Cipher.getInstance("AESWrap/ECB/NoPadding");
-    SecureRandom random = SecureRandom.getInstance("SHA1PRNG", "SUN");
-
-    byte[] kek = new byte[16];
-    random.nextBytes(kek);
-    byte[] randomKey = new byte[16];
-    random.nextBytes(randomKey);
-
-    keyWrapCipher.init(Cipher.WRAP_MODE, new SecretKeySpec(kek, "AES"));
-
-    Key randKey = new SecretKeySpec(randomKey, "AES");
-
-    byte[] wrappedKey = keyWrapCipher.wrap(randKey);
-
-    assertNotNull(wrappedKey);
-    // AESWrap will produce 24 bytes given 128 bits of key data with a 128-bit KEK
-    assertEquals(wrappedKey.length, randomKey.length + 8);
-
-    Cipher keyUnwrapCipher = Cipher.getInstance("AESWrap/ECB/NoPadding");
-    keyUnwrapCipher.init(Cipher.UNWRAP_MODE, new SecretKeySpec(kek, "AES"));
-    Key unwrappedKey = keyUnwrapCipher.unwrap(wrappedKey, "AES", Cipher.SECRET_KEY);
-
-    byte[] unwrappedKeyBytes = unwrappedKey.getEncoded();
-    assertArrayEquals(unwrappedKeyBytes, randomKey);
-
+  public void testKeyManagerGeneratesKey() throws NoSuchAlgorithmException, NoSuchProviderException,
+      NoSuchPaddingException, InvalidKeyException {
+    SecureRandom sr = SecureRandom.getInstance("SHA1PRNG", "SUN");
+    java.security.Key key;
+    key = KeyManager.generateKey(sr, 16);
+    Cipher.getInstance("AES/CBC/NoPadding").init(Cipher.ENCRYPT_MODE, key);
+
+    key = KeyManager.generateKey(sr, 24);
+    key = KeyManager.generateKey(sr, 32);
+    key = KeyManager.generateKey(sr, 11);
+
+    exception.expect(InvalidKeyException.class);
+    Cipher.getInstance("AES/CBC/NoPadding").init(Cipher.ENCRYPT_MODE, key);
   }
 
   @Test
-  public void testKeyEncryptionKeyCatchCorrectlyUsesValidKEKFile() throws IOException {
-    kekWorks = createKekFile("kekWorks.kek", 16);
-    testKekFile(kekWorks);
+  public void testKeyManagerWrapAndUnwrap()
+      throws NoSuchAlgorithmException, NoSuchProviderException {
+    SecureRandom sr = SecureRandom.getInstance("SHA1PRNG", "SUN");
+    java.security.Key kek = KeyManager.generateKey(sr, 16);
+    java.security.Key fek = KeyManager.generateKey(sr, 16);
+    byte[] wrapped = KeyManager.wrapKey(fek, kek);
+    assertFalse(Arrays.equals(fek.getEncoded(), wrapped));
+    java.security.Key unwrapped = KeyManager.unwrapKey(wrapped, kek);
+    assertEquals(unwrapped, fek);
   }
 
   @Test
-  public void testKeyEncryptionKeyCacheCorrectlyFailsWithInvalidLongKEKFile() throws IOException {
-    kekTooLong = createKekFile("kekTooLong.kek", 8);
-    exception.expect(IOException.class);
-    testKekFile(kekTooLong);
+  public void testKeyManagerLoadKekFromUri() throws IOException {
+    SecretKeySpec fileKey = KeyManager.loadKekFromUri("file:///tmp/testAESFile");
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(baos);
+    dos.writeUTF("sixteenbytekey");
+    SecretKeySpec handKey = new SecretKeySpec(baos.toByteArray(), "AES");
+    assertEquals(fileKey, handKey);
   }
 
-  @Test
-  public void testKeyEncryptionKeyCacheCorrectlyFailsWithInvalidShortKEKFile() throws IOException {
-    kekTooShort = createKekFile("kekTooShort.kek", 32);
-    exception.expect(IOException.class);
-    testKekFile(kekTooShort);
+  private ArrayList<Key> testData() {
+    ArrayList<Key> keys = new ArrayList<>();
+    keys.add(new Key("a", "cf", "cq"));
+    keys.add(new Key("a1", "cf", "cq"));
+    keys.add(new Key("a2", "cf", "cq"));
+    keys.add(new Key("a3", "cf", "cq"));
+    return keys;
   }
 
-  // Used to check reading of KEK files
-  @SuppressWarnings("deprecation")
-  private void testKekFile(File testFile) throws IOException {
-    assertTrue(testFile.exists());
-    assertFalse(testFile.isDirectory());
-
-    AccumuloConfiguration conf = setAndGetAccumuloConfig(CRYPTO_ON_CONF);
-    CryptoModuleParameters params = CryptoModuleFactory
-        .createParamsObjectFromAccumuloConfiguration(conf);
-    // TODO ACCUMULO-2530 this will need to be fixed when CachingHDFSSecretKeyEncryptionStrategy is
-    // fixed
-    params.getAllOptions().put(Property.INSTANCE_DFS_DIR.getKey(), testFile.getParent());
-    byte[] ptk = new byte[16];
-    params.setPlaintextKey(ptk);
-    CachingHDFSSecretKeyEncryptionStrategy skc = new CachingHDFSSecretKeyEncryptionStrategy();
-    params.getAllOptions().put(Property.CRYPTO_DEFAULT_KEY_STRATEGY_KEY_LOCATION.getKey(),
-        testFile.getName());
-    skc.encryptSecretKey(params);
-  }
+  private <C extends CryptoService> byte[] encrypt(C cs, Scope scope, String configFile)
+      throws Exception {
+    AccumuloConfiguration conf = setAndGetAccumuloConfig(configFile);
+    CryptoService cryptoService = CryptoServiceFactory.getConfigured(conf);
+    CryptoEnvironmentImpl env = new CryptoEnvironmentImpl(scope, null);
+    FileEncrypter encrypter = cryptoService.getFileEncrypter(env);
+    byte[] params = encrypter.getDecryptionParameters();
 
-  private File createKekFile(String filename, Integer size) throws IOException {
-    File dir = new File(System.getProperty("user.dir") + "/target/cryptoTest");
-    // must do something with return value to avoid findbugs
-    boolean foilFindbugs = dir.mkdirs(); // if the directories don't already exist, it'll return 1.
-                                         // If they do, 0. Both cases can be fine.
-    // must do something with java var to avoid java warning
-    foilFindbugs = !foilFindbugs;
-
-    File testFile = File.createTempFile(filename, ".kek", dir);
-    DataOutputStream os = new DataOutputStream(new FileOutputStream(testFile));
-    Integer kl = 16;
-    byte[] key = new byte[kl];
-    Random rand = new Random();
-    rand.nextBytes(key);
-    os.writeInt(size);
-    os.write(key);
-    os.flush();
-    os.close();
-
-    return testFile;
+    assertNotNull("CryptoService returned null FileEncrypter", encrypter);
+    assertEquals(cryptoService.getClass(), cs.getClass());
 
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    DataOutputStream dataOut = new DataOutputStream(out);
+    CryptoUtils.writeParams(params, dataOut);
+    DataOutputStream encrypted = new DataOutputStream(
+        encrypter.encryptStream(new NoFlushOutputStream(dataOut)));
+    assertNotNull(encrypted);
+
+    encrypted.writeUTF(MARKER_STRING);
+    encrypted.writeInt(MARKER_INT);
+    encrypted.close();
+    dataOut.close();
+    out.close();
+    return out.toByteArray();
   }
 
-  public void AESGCM_Encryption_Test_Correct_Encryption_And_Decryption()
-      throws IOException, AEADBadTagException {
-    AccumuloConfiguration conf = setAndGetAccumuloConfig(CRYPTO_ON_CONF);
-    byte[] encryptedBytes = testEncryption(conf, new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20});
-    Integer result = testDecryption(conf, encryptedBytes);
-    assertEquals(result, Integer.valueOf(1));
-  }
+  private void decrypt(byte[] resultingBytes, Scope scope, String configFile) throws Exception {
+    ByteArrayInputStream in = new ByteArrayInputStream(resultingBytes);
+    DataInputStream dataIn = new DataInputStream(in);
+    byte[] params = CryptoUtils.readParams(dataIn);
 
-  @Test
-  public void AESGCM_Encryption_Test_Tag_Integrity_Compromised()
-      throws IOException, AEADBadTagException {
-    AccumuloConfiguration conf = setAndGetAccumuloConfig(CRYPTO_ON_CONF);
-    byte[] encryptedBytes = testEncryption(conf, new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20});
-
-    encryptedBytes[encryptedBytes.length - 1]++; // modify the tag
-    exception.expect(AEADBadTagException.class);
-    testDecryption(conf, encryptedBytes);
-    encryptedBytes[encryptedBytes.length - 1]--;
-    encryptedBytes[1486]++; // modify the data
-    exception.expect(AEADBadTagException.class);
-    testDecryption(conf, encryptedBytes);
-  }
+    AccumuloConfiguration conf = setAndGetAccumuloConfig(configFile);
+    CryptoService cryptoService = CryptoServiceFactory.getConfigured(conf);
+    CryptoEnvironment env = new CryptoEnvironmentImpl(scope, params);
 
-  @Test
-  public void testIVIncrements() {
-    // One byte
-    byte[] testIv1 = new byte[1];
-    // 11111110
-    testIv1[0] = (byte) 0xFE;
-
-    // 11111111
-    CryptoModuleParameters.incrementIV(testIv1, testIv1.length - 1);
-    assertArrayEquals(testIv1, new byte[] {(byte) 0xff});
-
-    // 00000000
-    CryptoModuleParameters.incrementIV(testIv1, testIv1.length - 1);
-    assertArrayEquals(testIv1, new byte[] {(byte) 0x00});
-
-    // Two bytes
-    byte[] testIv2 = new byte[2];
-    // 00000000 11111110
-    testIv2[0] = (byte) 0x00;
-    testIv2[1] = (byte) 0xFE;
-
-    // 00000000 11111111
-    CryptoModuleParameters.incrementIV(testIv2, testIv2.length - 1);
-    assertArrayEquals(testIv2, new byte[] {(byte) 0x00, (byte) 0xFF});
-
-    // 00000001 00000000
-    CryptoModuleParameters.incrementIV(testIv2, testIv2.length - 1);
-    assertArrayEquals(testIv2, new byte[] {(byte) 0x01, (byte) 0x00});
-
-    // 00000001 00000001
-    CryptoModuleParameters.incrementIV(testIv2, testIv2.length - 1);
-    assertArrayEquals(testIv2, new byte[] {(byte) 0x01, (byte) 0x01});
-
-    // 11111111 11111111
-    testIv2[0] = (byte) 0xFF;
-    testIv2[1] = (byte) 0xFF;
-
-    // 00000000 00000000
-    CryptoModuleParameters.incrementIV(testIv2, testIv2.length - 1);
-    assertArrayEquals(testIv2, new byte[] {(byte) 0x00, (byte) 0x00});
-
-    // Three bytes
-    byte[] testIv3 = new byte[3];
-    // 00000000 00000000 11111110
-    testIv3[0] = (byte) 0x00;
-    testIv3[1] = (byte) 0x00;
-    testIv3[2] = (byte) 0xFE;
-
-    // 00000000 00000000 11111111
-    CryptoModuleParameters.incrementIV(testIv3, testIv3.length - 1);
-    assertArrayEquals(testIv3, new byte[] {(byte) 0x00, (byte) 0x00, (byte) 0xFF});
-
-    // 00000000 00000001 00000000
-    CryptoModuleParameters.incrementIV(testIv3, testIv3.length - 1);
-    assertArrayEquals(testIv3, new byte[] {(byte) 0x00, (byte) 0x01, (byte) 0x00});
-
-    // 00000000 00000001 00000001
-    CryptoModuleParameters.incrementIV(testIv3, testIv3.length - 1);
-    assertArrayEquals(testIv3, new byte[] {(byte) 0x00, (byte) 0x01, (byte) 0x01});
-
-    // 00000000 11111111 11111110
-    testIv3[0] = (byte) 0x00;
-    testIv3[1] = (byte) 0xFF;
-    testIv3[2] = (byte) 0xFE;
-
-    // 00000000 11111111 11111111
-    CryptoModuleParameters.incrementIV(testIv3, testIv3.length - 1);
-    assertArrayEquals(testIv3, new byte[] {(byte) 0x00, (byte) 0xFF, (byte) 0xFF});
-
-    // 00000001 00000000 00000000
-    CryptoModuleParameters.incrementIV(testIv3, testIv3.length - 1);
-    assertArrayEquals(testIv3, new byte[] {(byte) 0x01, (byte) 0x00, (byte) 0x00});
-
-    // 00000001 00000000 00000001
-    CryptoModuleParameters.incrementIV(testIv3, testIv3.length - 1);
-    assertArrayEquals(testIv3, new byte[] {(byte) 0x01, (byte) 0x00, (byte) 0x01});
-
-    // 11111111 11111111 11111110
-    testIv3[0] = (byte) 0xFF;
-    testIv3[1] = (byte) 0xFF;
-    testIv3[2] = (byte) 0xFE;
-
-    // 11111111 11111111 11111111
-    CryptoModuleParameters.incrementIV(testIv3, testIv3.length - 1);
-    assertArrayEquals(testIv3, new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0xFF});
-
-    // 00000000 00000000 00000000
-    CryptoModuleParameters.incrementIV(testIv3, testIv3.length - 1);
-    assertArrayEquals(testIv3, new byte[] {(byte) 0x00, (byte) 0x00, (byte) 0x00});
-
-    // 00000000 00000000 00000001
-    CryptoModuleParameters.incrementIV(testIv3, testIv3.length - 1);
-    assertArrayEquals(testIv3, new byte[] {(byte) 0x00, (byte) 0x00, (byte) 0x01});
+    FileDecrypter decrypter = cryptoService.getFileDecrypter(env);
 
-  }
+    DataInputStream decrypted = new DataInputStream(decrypter.decryptStream(dataIn));
+    String markerString = decrypted.readUTF();
+    int markerInt = decrypted.readInt();
 
-  /**
-   * Used in AESGCM unit tests to encrypt data. Uses MARKER_STRING and MARKER_INT
-   *
-   * @param conf
-   *          The accumulo configuration
-   * @param initVector
-   *          The IV to be used in encryption
-   * @return the encrypted string
-   * @throws IOException
-   *           if DataOutputStream fails
-   */
-  private static byte[] testEncryption(AccumuloConfiguration conf, byte[] initVector)
-      throws IOException {
-    CryptoModuleParameters params = CryptoModuleFactory
-        .createParamsObjectFromAccumuloConfiguration(conf);
-    CryptoModule cryptoModule = CryptoModuleFactory.getCryptoModule(conf);
-    params.getAllOptions().put(Property.CRYPTO_WAL_CIPHER_SUITE.getKey(), "AES/GCM/NoPadding");
-    params.setInitializationVector(initVector);
-
-    /*
-     * Now lets encrypt this data!
-     */
-    ByteArrayOutputStream encryptedByteStream = new ByteArrayOutputStream();
-    params.setPlaintextOutputStream(new NoFlushOutputStream(encryptedByteStream));
-    params = cryptoModule.getEncryptingOutputStream(params);
-    DataOutputStream encryptedDataStream = new DataOutputStream(params.getEncryptedOutputStream());
-    encryptedDataStream.writeUTF(MARKER_STRING);
-    encryptedDataStream.writeInt(MARKER_INT);
-    encryptedDataStream.close();
-    return (encryptedByteStream.toByteArray());
+    assertEquals(MARKER_STRING, markerString);
+    assertEquals(MARKER_INT, markerInt);
+    in.close();
+    dataIn.close();
   }
 
-  /**
-   * Used in AESGCM unit tests to decrypt data. Uses MARKER_STRING and MARKER_INT
-   *
-   * @param conf
-   *          The accumulo configuration
-   * @param encryptedBytes
-   *          The encrypted bytes
-   * @return 0 if data is incorrectly decrypted, 1 if decrypted data matches input
-   * @throws IOException
-   *           if DataInputStream fails
-   * @throws AEADBadTagException
-   *           if the encrypted stream has been modified
-   */
-  private static Integer testDecryption(AccumuloConfiguration conf, byte[] encryptedBytes)
-      throws IOException, AEADBadTagException {
-    CryptoModuleParameters params = CryptoModuleFactory
-        .createParamsObjectFromAccumuloConfiguration(conf);
-    CryptoModule cryptoModule = CryptoModuleFactory.getCryptoModule(conf);
-    ByteArrayInputStream decryptedByteStream = new ByteArrayInputStream(encryptedBytes);
-    params.setEncryptedInputStream(decryptedByteStream);
-    params = cryptoModule.getDecryptingInputStream(params);
-    DataInputStream decryptedDataStream = new DataInputStream(params.getPlaintextInputStream());
-
-    String utf;
-    Integer in;
-    try {
-      utf = decryptedDataStream.readUTF();
-      in = decryptedDataStream.readInt();
-    } catch (IOException e) {
-      if (e.getCause().getClass().equals(AEADBadTagException.class)) {
-        throw new AEADBadTagException();
-      } else {
-        throw e;
-      }
-    }
+  private String getStringifiedBytes(byte[] params, String s, int i) throws IOException {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    DataOutputStream dataOut = new DataOutputStream(out);
 
-    decryptedDataStream.close();
-    if (utf.equals(MARKER_STRING) && in.equals(MARKER_INT)) {
-      return 1;
-    } else {
-      return 0;
+    if (params != null) {
+      dataOut.writeInt(params.length);
+      dataOut.write(params);
     }
+    dataOut.writeUTF(s);
+    dataOut.writeInt(i);
+    dataOut.close();
+    byte[] stringMarkerBytes = out.toByteArray();
+    return Arrays.toString(stringMarkerBytes);
   }
+
 }
diff --git a/core/src/test/resources/crypto-on-accumulo-site.xml b/core/src/test/resources/crypto-on-accumulo-site.xml
index 8192d6d..6aab2c2 100644
--- a/core/src/test/resources/crypto-on-accumulo-site.xml
+++ b/core/src/test/resources/crypto-on-accumulo-site.xml
@@ -1,4 +1,5 @@
 <?xml version="1.0" encoding="UTF-8"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
 <!--
   Licensed to the Apache Software Foundation (ASF) under one or more
   contributor license agreements.  See the NOTICE file distributed with
@@ -15,16 +16,26 @@
   See the License for the specific language governing permissions and
   limitations under the License.
 -->
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
 
 <configuration>
-  
+    <property>
+        <name>instance.crypto.service</name>
+        <value>org.apache.accumulo.core.security.crypto.impl.AESCryptoService</value>
+    </property>
+    <property>
+        <name>instance.crypto.opts.kekId</name>
+        <value>file:///tmp/testAESFile</value>
+    </property>
+    <property>
+        <name>instance.crypto.opts.keyManager</name>
+        <value>uri</value>
+    </property>
+
     <property>
       <name>instance.zookeeper.host</name>
       <value>localhost:2181</value>
       <description>comma separated list of zookeeper servers</description>
     </property>
-
     <property>
       <name>instance.secret</name>
       <value>DEFAULT</value>
@@ -50,84 +61,12 @@
     </property>
     
     <property>
-      <name>trace.password</name>
-      <!-- 
-        change this to the root user's password, and/or change the user below 
-       -->
-      <value>password</value>
-    </property>
-    
-    <property>
-      <name>trace.user</name>
-      <value>root</value>
-    </property>
-    
-    <property>
       <name>tserver.sort.buffer.size</name>
       <value>50M</value>
     </property>
     
     <property>
       <name>tserver.walog.max.size</name>
-      <value>100M</value>
-    </property>
-    <property>
-      <name>crypto.module.class</name>
-      <value>org.apache.accumulo.core.security.crypto.DefaultCryptoModule</value>
-    </property>
-    <property>
-      <name>crypto.security.provider</name>
-      <value>SunJCE</value>
-    </property>
-    <property>
-      <name>crypto.cipher.suite</name>
-      <value>AES/GCM/NoPadding</value>
-    </property>
-    <property>
-      <name>crypto.wal.cipher.suite</name>
-      <value>AES/CBC/NoPadding</value>
+      <value>512M</value>
     </property>
-    <property>
-      <name>crypto.cipher.key.algorithm.name</name>
-      <value>AES</value>
-    </property>
-    <property>
-      <name>crypto.cipher.key.length</name>
-      <value>128</value>
-    </property>
-    <property>
-      <name>crypto.secure.rng</name>
-      <value>SHA1PRNG</value>
-    </property>
-    <property>
-      <name>crypto.secure.rng.provider</name>
-      <value>SUN</value>
-    </property>
-    <property>
-      <name>crypto.security.provider</name>
-      <value>SunJCE</value>
-    </property>
-    <property>
-      <name>crypto.secret.key.encryption.strategy.class</name>
-      <value>org.apache.accumulo.core.security.crypto.CachingHDFSSecretKeyEncryptionStrategy</value>
-    </property>
-    <property>
-      <name>instance.dfs.dir</name>
-      <value>target/dfs</value>
-    </property>
-    <property>
-      <name>instance.dfs.uri</name>
-      <value>file:///</value>
-    </property>
-    
-    <property>
-      <name>crypto.default.key.strategy.key.location</name>
-      <value>test.secret.key</value>
-    </property>
-    
-    <property>
-    	<name>crypto.default.key.strategy.cipher.suite</name>
-    	<value>AESWrap/ECB/NoPadding</value>
-    </property>
-
 </configuration>
diff --git a/core/src/test/resources/crypto-on-no-key-encryption-accumulo-site.xml b/core/src/test/resources/crypto-on-no-key-encryption-accumulo-site.xml
deleted file mode 100644
index f477682..0000000
--- a/core/src/test/resources/crypto-on-no-key-encryption-accumulo-site.xml
+++ /dev/null
@@ -1,111 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-
-<configuration>
-
-    <property>
-      <name>instance.zookeeper.host</name>
-      <value>localhost:2181</value>
-      <description>comma separated list of zookeeper servers</description>
-    </property>
-
-    <property>
-      <name>instance.secret</name>
-      <value>DEFAULT</value>
-      <description>A secret unique to a given instance that all servers must know in order to communicate with one another.
-                   Change it before initialization. To change it later use ./bin/accumulo org.apache.accumulo.server.util.ChangeSecret [oldpasswd] [newpasswd],
-                   and then update this file.
-      </description>
-    </property>
-
-    <property>
-      <name>tserver.memory.maps.max</name>
-      <value>80M</value>
-    </property>
-
-    <property>
-      <name>tserver.cache.data.size</name>
-      <value>7M</value>
-    </property>
-
-    <property>
-      <name>tserver.cache.index.size</name>
-      <value>20M</value>
-    </property>
-
-    <property>
-      <name>trace.password</name>
-      <!--
-        change this to the root user's password, and/or change the user below
-       -->
-      <value>password</value>
-    </property>
-
-    <property>
-      <name>trace.user</name>
-      <value>root</value>
-    </property>
-
-    <property>
-      <name>tserver.sort.buffer.size</name>
-      <value>50M</value>
-    </property>
-
-    <property>
-      <name>tserver.walog.max.size</name>
-      <value>100M</value>
-    </property>
-    <property>
-      <name>crypto.module.class</name>
-      <value>org.apache.accumulo.core.security.crypto.DefaultCryptoModule</value>
-    </property>
-    <property>
-      <name>crypto.cipher.suite</name>
-      <value>AES/CFB/NoPadding</value>
-    </property>
-    <property>
-      <name>crypto.wal.cipher.suite</name>
-      <value>AES/CBC/NoPadding</value>
-    </property>
-    <property>
-      <name>crypto.cipher.key.algorithm.name</name>
-      <value>AES</value>
-    </property>
-    <property>
-      <name>crypto.cipher.key.length</name>
-      <value>128</value>
-    </property>
-    <property>
-      <name>crypto.secure.rng</name>
-      <value>SHA1PRNG</value>
-    </property>
-    <property>
-      <name>crypto.secure.rng.provider</name>
-      <value>SUN</value>
-    </property>
-    <property>
-      <name>instance.dfs.dir</name>
-      <value>/tmp</value>
-    </property>
-    <property>
-      <name>instance.dfs.uri</name>
-      <value>file:///</value>
-    </property>
-
-</configuration>
diff --git a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java
index 6ac37bd..2c4524d 100644
--- a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java
+++ b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java
@@ -659,6 +659,15 @@ public class MiniAccumuloConfigImpl {
   }
 
   /**
+   * Sets arbitrary configuration properties.
+   *
+   * @since 2.0.0
+   */
+  public void setProperty(String p, String value) {
+    this.siteConfig.put(p, value);
+  }
+
+  /**
    * @return the useCredentialProvider
    */
   public boolean isUseCredentialProvider() {
diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
index 059ac58..80d6bc2 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
@@ -482,7 +482,8 @@ public class Initialize implements KeywordExecutable {
 
   private void initFileSystem(Opts opts, VolumeManager fs, UUID uuid, String rootTabletDir)
       throws IOException {
-    initDirs(fs, uuid, VolumeConfiguration.getVolumeUris(SiteConfiguration.getInstance()), false);
+    AccumuloConfiguration siteConf = SiteConfiguration.getInstance();
+    initDirs(fs, uuid, VolumeConfiguration.getVolumeUris(siteConf), false);
 
     // initialize initial system tables config in zookeeper
     initSystemTablesConfig(Constants.ZROOT + "/" + uuid);
@@ -508,7 +509,7 @@ public class Initialize implements KeywordExecutable {
     String metadataFileName = tableMetadataTabletDir + Path.SEPARATOR + "0_1." + ext;
     Tablet replicationTablet = new Tablet(ReplicationTable.ID, replicationTableDefaultTabletDir,
         null, null);
-    createMetadataFile(fs, metadataFileName, replicationTablet);
+    createMetadataFile(fs, metadataFileName, siteConf, replicationTablet);
 
     // populate the root tablet with info about the metadata table's two initial tablets
     String rootTabletFileName = rootTabletDir + Path.SEPARATOR + "00000_00000." + ext;
@@ -516,7 +517,7 @@ public class Initialize implements KeywordExecutable {
     Tablet tablesTablet = new Tablet(MetadataTable.ID, tableMetadataTabletDir, null, splitPoint,
         metadataFileName);
     Tablet defaultTablet = new Tablet(MetadataTable.ID, defaultMetadataTabletDir, splitPoint, null);
-    createMetadataFile(fs, rootTabletFileName, tablesTablet, defaultTablet);
+    createMetadataFile(fs, rootTabletFileName, siteConf, tablesTablet, defaultTablet);
   }
 
   private static class Tablet {
@@ -535,7 +536,7 @@ public class Initialize implements KeywordExecutable {
   }
 
   private static void createMetadataFile(VolumeManager volmanager, String fileName,
-      Tablet... tablets) throws IOException {
+      AccumuloConfiguration conf, Tablet... tablets) throws IOException {
     // sort file contents in memory, then play back to the file
     TreeMap<Key,Value> sorted = new TreeMap<>();
     for (Tablet tablet : tablets) {
@@ -543,8 +544,7 @@ public class Initialize implements KeywordExecutable {
     }
     FileSystem fs = volmanager.getVolumeByPath(new Path(fileName)).getFileSystem();
     FileSKVWriter tabletWriter = FileOperations.getInstance().newWriterBuilder()
-        .forFile(fileName, fs, fs.getConf())
-        .withTableConfiguration(DefaultConfiguration.getInstance()).build();
+        .forFile(fileName, fs, fs.getConf()).withTableConfiguration(conf).build();
     tabletWriter.startDefaultLocalityGroup();
 
     for (Entry<Key,Value> entry : sorted.entrySet()) {
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java
index 5d7ae1a..152c9eb 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java
@@ -38,6 +38,7 @@ import org.apache.accumulo.core.data.impl.TabletIdImpl;
 import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.file.FileSKVIterator;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.security.crypto.CryptoServiceFactory;
 import org.apache.accumulo.core.spi.cache.BlockCache;
 import org.apache.accumulo.core.summary.Gatherer;
 import org.apache.accumulo.core.summary.SummarizerFactory;
@@ -150,8 +151,9 @@ public class MajorCompactionRequest implements Cloneable {
     for (FileRef file : files) {
       FileSystem fs = volumeManager.getVolumeByPath(file.path()).getFileSystem();
       Configuration conf = CachedConfiguration.getInstance();
-      SummaryCollection fsc = SummaryReader.load(fs, conf, tableConfig, factory, file.path(),
-          summarySelector, summaryCache, indexCache)
+      SummaryCollection fsc = SummaryReader
+          .load(fs, conf, tableConfig, factory, file.path(), summarySelector, summaryCache,
+              indexCache, CryptoServiceFactory.getConfigured(tableConfig))
           .getSummaries(Collections.singletonList(new Gatherer.RowRange(extent)));
       sc.merge(fsc, factory);
     }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index 5cb52a9..24ada5e 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -17,6 +17,7 @@
 package org.apache.accumulo.tserver.log;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.accumulo.core.security.crypto.impl.CryptoEnvironmentImpl.Scope;
 import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_FINISH;
 import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_START;
 import static org.apache.accumulo.tserver.logger.LogEvents.DEFINE_TABLET;
@@ -25,7 +26,6 @@ import static org.apache.accumulo.tserver.logger.LogEvents.OPEN;
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
-import java.io.EOFException;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.lang.reflect.Method;
@@ -33,9 +33,7 @@ import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -46,11 +44,15 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.impl.KeyExtent;
-import org.apache.accumulo.core.security.crypto.CryptoModule;
-import org.apache.accumulo.core.security.crypto.CryptoModuleFactory;
-import org.apache.accumulo.core.security.crypto.CryptoModuleParameters;
-import org.apache.accumulo.core.security.crypto.DefaultCryptoModule;
-import org.apache.accumulo.core.security.crypto.NoFlushOutputStream;
+import org.apache.accumulo.core.security.crypto.CryptoServiceFactory;
+import org.apache.accumulo.core.security.crypto.CryptoUtils;
+import org.apache.accumulo.core.security.crypto.impl.CryptoEnvironmentImpl;
+import org.apache.accumulo.core.security.crypto.impl.NoCryptoService;
+import org.apache.accumulo.core.security.crypto.streams.NoFlushOutputStream;
+import org.apache.accumulo.core.spi.crypto.CryptoEnvironment;
+import org.apache.accumulo.core.spi.crypto.CryptoService;
+import org.apache.accumulo.core.spi.crypto.FileDecrypter;
+import org.apache.accumulo.core.spi.crypto.FileEncrypter;
 import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.fate.util.LoggingRunnable;
@@ -77,8 +79,15 @@ import com.google.common.base.Joiner;
  *
  */
 public class DfsLogger implements Comparable<DfsLogger> {
+  // older versions should no longer be supported in 2.0
   public static final String LOG_FILE_HEADER_V2 = "--- Log File Header (v2) ---";
   public static final String LOG_FILE_HEADER_V3 = "--- Log File Header (v3) ---";
+  /**
+   * Simplified encryption technique supported in V4.
+   *
+   * @since 2.0
+   */
+  public static final String LOG_FILE_HEADER_V4 = "--- Log File Header (v4) ---";
 
   private static final Logger log = LoggerFactory.getLogger(DfsLogger.class);
   private static final DatanodeInfo[] EMPTY_PIPELINE = new DatanodeInfo[0];
@@ -353,89 +362,27 @@ public class DfsLogger implements Comparable<DfsLogger> {
 
   public static DFSLoggerInputStreams readHeaderAndReturnStream(FSDataInputStream input,
       AccumuloConfiguration conf) throws IOException {
-    DataInputStream decryptingInput = null;
+    DataInputStream decryptingInput;
 
-    byte[] magic = DfsLogger.LOG_FILE_HEADER_V3.getBytes(UTF_8);
+    byte[] magic = DfsLogger.LOG_FILE_HEADER_V4.getBytes(UTF_8);
     byte[] magicBuffer = new byte[magic.length];
     try {
       input.readFully(magicBuffer);
       if (Arrays.equals(magicBuffer, magic)) {
-        // additional parameters it needs from the underlying stream.
-        String cryptoModuleClassname = input.readUTF();
-        CryptoModule cryptoModule = CryptoModuleFactory.getCryptoModule(cryptoModuleClassname);
-
-        // Create the parameters and set the input stream into those parameters
-        CryptoModuleParameters params = CryptoModuleFactory
-            .createParamsObjectFromAccumuloConfiguration(conf);
-        params.setEncryptedInputStream(input);
-
-        // Create the plaintext input stream from the encrypted one
-        params = cryptoModule.getDecryptingInputStream(params);
-
-        if (params.getPlaintextInputStream() instanceof DataInputStream) {
-          decryptingInput = (DataInputStream) params.getPlaintextInputStream();
-        } else {
-          decryptingInput = new DataInputStream(params.getPlaintextInputStream());
-        }
+        byte[] params = CryptoUtils.readParams(input);
+        CryptoService cryptoService = CryptoServiceFactory.getConfigured(conf);
+        CryptoEnvironment env = new CryptoEnvironmentImpl(Scope.WAL, params);
+
+        FileDecrypter decrypter = cryptoService.getFileDecrypter(env);
+        log.debug("Using {} for decrypting WAL", cryptoService.getClass().getSimpleName());
+        decryptingInput = cryptoService instanceof NoCryptoService ? input
+            : new DataInputStream(decrypter.decryptStream(input));
       } else {
+        log.error("Unsupported WAL version.");
         input.seek(0);
-        byte[] magicV2 = DfsLogger.LOG_FILE_HEADER_V2.getBytes(UTF_8);
-        byte[] magicBufferV2 = new byte[magicV2.length];
-        input.readFully(magicBufferV2);
-
-        if (Arrays.equals(magicBufferV2, magicV2)) {
-          // Log files from 1.5 dump their options in raw to the logger files. Since we don't know
-          // the class
-          // that needs to read those files, we can make a couple of basic assumptions. Either it's
-          // going to be
-          // the NullCryptoModule (no crypto) or the DefaultCryptoModule.
-
-          // If it's null, we won't have any parameters whatsoever. First, let's attempt to read
-          // parameters
-          Map<String,String> opts = new HashMap<>();
-          int count = input.readInt();
-          for (int i = 0; i < count; i++) {
-            String key = input.readUTF();
-            String value = input.readUTF();
-            opts.put(key, value);
-          }
-
-          if (opts.size() == 0) {
-            // NullCryptoModule, we're done
-            decryptingInput = input;
-          } else {
-
-            // The DefaultCryptoModule will want to read the parameters from the underlying file, so
-            // we will put the file back to that spot.
-            // @formatter:off
-            org.apache.accumulo.core.security.crypto.CryptoModule cryptoModule =
-              org.apache.accumulo.core.security.crypto.CryptoModuleFactory
-                .getCryptoModule(DefaultCryptoModule.class.getName());
-            // @formatter:on
-
-            CryptoModuleParameters params = CryptoModuleFactory
-                .createParamsObjectFromAccumuloConfiguration(conf);
-
-            // go back to the beginning, but skip over magicV2 already checked earlier
-            input.seek(magicV2.length);
-            params.setEncryptedInputStream(input);
-
-            params = cryptoModule.getDecryptingInputStream(params);
-            if (params.getPlaintextInputStream() instanceof DataInputStream) {
-              decryptingInput = (DataInputStream) params.getPlaintextInputStream();
-            } else {
-              decryptingInput = new DataInputStream(params.getPlaintextInputStream());
-            }
-          }
-
-        } else {
-
-          input.seek(0);
-          decryptingInput = input;
-        }
-
+        decryptingInput = input;
       }
-    } catch (EOFException e) {
+    } catch (Exception e) {
       log.warn("Got EOFException trying to read WAL header information,"
           + " assuming the rest of the file has no data.");
       // A TabletServer might have died before the (complete) header was written
@@ -481,48 +428,19 @@ public class DfsLogger implements Comparable<DfsLogger> {
       sync = logFile.getClass().getMethod("hsync");
       flush = logFile.getClass().getMethod("hflush");
 
-      // Initialize the crypto operations.
-      // @formatter:off
-      org.apache.accumulo.core.security.crypto.CryptoModule cryptoModule =
-        org.apache.accumulo.core.security.crypto.CryptoModuleFactory
-          .getCryptoModule(conf.getConfiguration().get(Property.CRYPTO_MODULE_CLASS));
-      // @formatter:on
-
-      // Initialize the log file with a header and the crypto params used to set up this log file.
-      logFile.write(LOG_FILE_HEADER_V3.getBytes(UTF_8));
-
-      CryptoModuleParameters params = CryptoModuleFactory
-          .createParamsObjectFromAccumuloConfiguration(conf.getConfiguration());
-      // Immediately update to the correct cipher. Doing this here keeps the CryptoModule
-      // independent of the writers using it
-      if (params.getAllOptions().get(Property.CRYPTO_WAL_CIPHER_SUITE.getKey()) != null
-          && !params.getAllOptions().get(Property.CRYPTO_WAL_CIPHER_SUITE.getKey()).equals("")) {
-        params
-            .setCipherSuite(params.getAllOptions().get(Property.CRYPTO_WAL_CIPHER_SUITE.getKey()));
-        params.getAllOptions().put(Property.CRYPTO_CIPHER_SUITE.getKey(), params.getCipherSuite());
-      }
-
-      NoFlushOutputStream nfos = new NoFlushOutputStream(logFile);
-      params.setPlaintextOutputStream(nfos);
+      // Initialize the log file with a header and its encryption
+      CryptoService cryptoService = CryptoServiceFactory.getConfigured(conf.getConfiguration());
+      logFile.write(LOG_FILE_HEADER_V4.getBytes(UTF_8));
 
-      // In order to bootstrap the reading of this file later, we have to record the CryptoModule
-      // that was used to encipher it here,
-      // so that that crypto module can re-read its own parameters.
+      log.debug("Using {} for encrypting WAL {}", cryptoService.getClass().getSimpleName(),
+          filename);
+      CryptoEnvironment env = new CryptoEnvironmentImpl(Scope.WAL, null);
+      FileEncrypter encrypter = cryptoService.getFileEncrypter(env);
+      byte[] cryptoParams = encrypter.getDecryptionParameters();
+      CryptoUtils.writeParams(cryptoParams, logFile);
 
-      logFile.writeUTF(conf.getConfiguration().get(Property.CRYPTO_MODULE_CLASS));
-
-      params = cryptoModule.getEncryptingOutputStream(params);
-      OutputStream encipheringOutputStream = params.getEncryptedOutputStream();
-
-      // If the module just kicks back our original stream, then just use it, don't wrap it in
-      // another data OutputStream.
-      if (encipheringOutputStream == nfos) {
-        log.debug("No enciphering, using raw output stream");
-        encryptingLogFile = nfos;
-      } else {
-        log.debug("Enciphering found, wrapping in DataOutputStream");
-        encryptingLogFile = new DataOutputStream(encipheringOutputStream);
-      }
+      encryptingLogFile = new DataOutputStream(
+          encrypter.encryptStream(new NoFlushOutputStream(logFile)));
 
       LogFileKey key = new LogFileKey();
       key.event = OPEN;
diff --git a/test/src/main/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java b/test/src/main/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java
index 705c25a..9503436 100644
--- a/test/src/main/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java
@@ -177,8 +177,8 @@ public class MissingWalHeaderCompletesRecoveryIT extends ConfigurableMacBase {
 
     // Write half of the header
     FSDataOutputStream wal = fs.create(new Path(partialHeaderWalog.toURI()));
-    wal.write(DfsLogger.LOG_FILE_HEADER_V3.getBytes(UTF_8), 0,
-        DfsLogger.LOG_FILE_HEADER_V3.length() / 2);
+    wal.write(DfsLogger.LOG_FILE_HEADER_V4.getBytes(UTF_8), 0,
+        DfsLogger.LOG_FILE_HEADER_V4.length() / 2);
     wal.close();
 
     Assert.assertTrue("root user did not have write permission to metadata table",
diff --git a/test/src/main/java/org/apache/accumulo/test/ShellConfigIT.java b/test/src/main/java/org/apache/accumulo/test/ShellConfigIT.java
index acd39be..6c1d2e8 100644
--- a/test/src/main/java/org/apache/accumulo/test/ShellConfigIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ShellConfigIT.java
@@ -103,11 +103,12 @@ public class ShellConfigIT extends AccumuloClusterHarness {
       Assert.fail("Unknown token type");
     }
 
-    assertTrue(Property.CRYPTO_CIPHER_KEY_ALGORITHM_NAME.isExperimental());
+    assertTrue(Property.INSTANCE_CRYPTO_PREFIX.isExperimental());
+    assertTrue(Property.INSTANCE_CRYPTO_SERVICE.isExperimental());
 
     String configOutput = ts.exec("config");
 
     assertTrue(configOutput.contains(PerTableVolumeChooser.TABLE_VOLUME_CHOOSER));
-    assertFalse(configOutput.contains(Property.CRYPTO_CIPHER_KEY_ALGORITHM_NAME.getKey()));
+    assertFalse(configOutput.contains(Property.INSTANCE_CRYPTO_SERVICE.getKey()));
   }
 }
diff --git a/test/src/main/java/org/apache/accumulo/test/VolumeIT.java b/test/src/main/java/org/apache/accumulo/test/VolumeIT.java
index 4ba085e..56fd304 100644
--- a/test/src/main/java/org/apache/accumulo/test/VolumeIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/VolumeIT.java
@@ -91,7 +91,7 @@ public class VolumeIT extends ConfigurableMacBase {
 
   @Override
   protected int defaultTimeoutSeconds() {
-    return 5 * 60;
+    return 10 * 60;
   }
 
   @SuppressWarnings("deprecation")
@@ -164,7 +164,7 @@ public class VolumeIT extends ConfigurableMacBase {
       assertEquals(1, diskUsage.size());
       long usage = diskUsage.get(0).getUsage();
       log.debug("usage {}", usage);
-      assertTrue(usage > 700 && usage < 800);
+      assertTrue(usage > 700 && usage < 900);
     }
   }
 
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/WriteAheadLogEncryptedIT.java b/test/src/main/java/org/apache/accumulo/test/functional/WriteAheadLogEncryptedIT.java
new file mode 100644
index 0000000..1b263de
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/WriteAheadLogEncryptedIT.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.apache.accumulo.core.conf.Property.INSTANCE_CRYPTO_PREFIX;
+
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.TestIngest;
+import org.apache.accumulo.test.VerifyIngest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WriteAheadLogEncryptedIT extends AccumuloClusterHarness {
+
+  private static final Logger log = LoggerFactory.getLogger(WriteAheadLogEncryptedIT.class);
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    String keyPath = System.getProperty("user.dir")
+        + "/target/mini-tests/WriteAheadLogEncryptedIT-testkeyfile";
+    cfg.setProperty(Property.INSTANCE_CRYPTO_SERVICE,
+        "org.apache.accumulo.core.security.crypto.impl.AESCryptoService");
+    cfg.setProperty(INSTANCE_CRYPTO_PREFIX.getKey() + "kekId", keyPath);
+    cfg.setProperty(INSTANCE_CRYPTO_PREFIX.getKey() + "keyManager", "uri");
+
+    cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "2M");
+    cfg.setProperty(Property.GC_CYCLE_DELAY, "1");
+    cfg.setProperty(Property.GC_CYCLE_START, "1");
+    cfg.setProperty(Property.MASTER_RECOVERY_DELAY, "1s");
+    cfg.setProperty(Property.TSERV_MAJC_DELAY, "1");
+    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
+    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+
+    // setup key file
+    try {
+      Path keyFile = new Path(keyPath);
+      FileSystem fs = FileSystem.getLocal(CachedConfiguration.getInstance());
+      fs.delete(keyFile, true);
+      if (fs.createNewFile(keyFile))
+        log.info("Created keyfile at {}", keyPath);
+      else
+        log.error("Failed to create key file at {}", keyPath);
+
+      try (FSDataOutputStream out = fs.create(keyFile)) {
+        out.writeUTF("sixteenbytekey"); // 14 + 2 from writeUTF
+      }
+    } catch (Exception e) {
+      log.error("Exception during configure", e);
+    }
+  }
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 10 * 60;
+  }
+
+  @Test
+  public void test() throws Exception {
+    Connector c = getConnector();
+    String tableName = getUniqueNames(1)[0];
+    c.tableOperations().create(tableName);
+    c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "750K");
+    TestIngest.Opts opts = new TestIngest.Opts();
+    VerifyIngest.Opts vopts = new VerifyIngest.Opts();
+    opts.setTableName(tableName);
+    opts.setClientInfo(getClientInfo());
+    vopts.setClientInfo(getClientInfo());
+
+    TestIngest.ingest(c, opts, new BatchWriterOpts());
+    vopts.setTableName(tableName);
+    VerifyIngest.verifyIngest(c, vopts, new ScannerOpts());
+    getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
+    getCluster().getClusterControl().startAllServers(ServerType.TABLET_SERVER);
+    VerifyIngest.verifyIngest(c, vopts, new ScannerOpts());
+  }
+
+}
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/UnusedWalDoesntCloseReplicationStatusIT.java b/test/src/main/java/org/apache/accumulo/test/replication/UnusedWalDoesntCloseReplicationStatusIT.java
index 2e51cc6..4d0e276 100644
--- a/test/src/main/java/org/apache/accumulo/test/replication/UnusedWalDoesntCloseReplicationStatusIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/replication/UnusedWalDoesntCloseReplicationStatusIT.java
@@ -101,7 +101,7 @@ public class UnusedWalDoesntCloseReplicationStatusIT extends ConfigurableMacBase
     // Make a fake WAL with no data in it for our real table
     FSDataOutputStream out = fs.create(new Path(tserverWal.getAbsolutePath()));
 
-    out.write(DfsLogger.LOG_FILE_HEADER_V3.getBytes(UTF_8));
+    out.write(DfsLogger.LOG_FILE_HEADER_V4.getBytes(UTF_8));
 
     DataOutputStream dos = new DataOutputStream(out);
     dos.writeUTF("NullCryptoModule");