You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2018/11/26 18:51:48 UTC

[GitHub] milleruntime closed pull request #765: MR Improvements Closes #753 #751

milleruntime closed pull request #765: MR Improvements Closes #753 #751
URL: https://github.com/apache/accumulo/pull/765
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloFileOutputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloFileOutputFormat.java
index d44219d285..ff531b96d0 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloFileOutputFormat.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloFileOutputFormat.java
@@ -16,24 +16,16 @@
  */
 package org.apache.accumulo.hadoop.mapred;
 
-import static org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setCompressionType;
-import static org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setDataBlockSize;
-import static org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setFileBlockSize;
-import static org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setIndexBlockSize;
-import static org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setReplication;
-import static org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setSampler;
-import static org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setSummarizers;
-
 import java.io.IOException;
 
 import org.apache.accumulo.core.client.rfile.RFile;
 import org.apache.accumulo.core.client.rfile.RFileWriter;
-import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.hadoop.mapreduce.FileOutputInfo;
+import org.apache.accumulo.hadoop.mapreduce.FileOutputFormatBuilder;
+import org.apache.accumulo.hadoopImpl.mapreduce.FileOutputFormatBuilderImpl;
 import org.apache.accumulo.hadoopImpl.mapreduce.lib.ConfiguratorBase;
 import org.apache.accumulo.hadoopImpl.mapreduce.lib.FileOutputConfigurator;
 import org.apache.hadoop.conf.Configuration;
@@ -46,18 +38,7 @@
 import org.apache.hadoop.util.Progressable;
 
 /**
- * This class allows MapReduce jobs to write output in the Accumulo data file format.<br>
- * Care should be taken to write only sorted data (sorted by {@link Key}), as this is an important
- * requirement of Accumulo data files.
- *
- * <p>
- * The output path to be created must be specified via {@link #setInfo(JobConf, FileOutputInfo)}
- * using {@link FileOutputInfo#builder()}.outputPath(path). For all available options see
- * {@link FileOutputInfo#builder()}
- * <p>
- * Methods inherited from {@link FileOutputFormat} are not supported and may be ignored or cause
- * failures. Using other Hadoop configuration options that affect the behavior of the underlying
- * files directly in the Job's configuration may work, but are not directly supported at this time.
+ * @see org.apache.accumulo.hadoop.mapreduce.AccumuloFileOutputFormat
  *
  * @since 2.0
  */
@@ -100,22 +81,8 @@ public void write(Key key, Value value) throws IOException {
   /**
    * Sets all the information required for this map reduce job.
    */
-  public static void setInfo(JobConf job, FileOutputInfo info) {
-    setOutputPath(job, info.getOutputPath());
-    if (info.getCompressionType().isPresent())
-      setCompressionType(job, info.getCompressionType().get());
-    if (info.getDataBlockSize().isPresent())
-      setDataBlockSize(job, info.getDataBlockSize().get());
-    if (info.getFileBlockSize().isPresent())
-      setFileBlockSize(job, info.getFileBlockSize().get());
-    if (info.getIndexBlockSize().isPresent())
-      setIndexBlockSize(job, info.getIndexBlockSize().get());
-    if (info.getReplication().isPresent())
-      setReplication(job, info.getReplication().get());
-    if (info.getSampler().isPresent())
-      setSampler(job, info.getSampler().get());
-    if (info.getSummarizers().size() > 0)
-      setSummarizers(job, info.getSummarizers().toArray(new SummarizerConfiguration[0]));
+  public static FileOutputFormatBuilder.PathParams<JobConf> configure() {
+    return new FileOutputFormatBuilderImpl<JobConf>();
   }
 
 }
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormat.java
index a344a4b610..92df89d1b0 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormat.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormat.java
@@ -16,29 +16,16 @@
  */
 package org.apache.accumulo.hadoop.mapred;
 
-import static org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setClassLoaderContext;
-import static org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setClientInfo;
-import static org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setScanAuthorizations;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setAutoAdjustRanges;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setBatchScan;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setExecutionHints;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setInputTableName;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setLocalIterators;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setOfflineTableScan;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setRanges;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setSamplerConfiguration;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setScanIsolation;
-
 import java.io.IOException;
 import java.util.Map.Entry;
 
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.util.format.DefaultFormatter;
-import org.apache.accumulo.hadoop.mapreduce.InputInfo;
+import org.apache.accumulo.hadoop.mapreduce.InputFormatBuilder;
 import org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat;
 import org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.RecordReaderBase;
-import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator;
+import org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBuilderImpl;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
@@ -48,17 +35,7 @@
 import org.slf4j.LoggerFactory;
 
 /**
- * This class allows MapReduce jobs to use Accumulo as the source of data. This
- * {@link org.apache.hadoop.mapred.InputFormat} provides keys and values of type {@link Key} and
- * {@link Value} to the Map function.
- *
- * The user must specify the following via static configurator method:
- *
- * <ul>
- * <li>{@link AccumuloInputFormat#setInfo(JobConf, InputInfo)}
- * </ul>
- *
- * For required parameters and all available options use {@link InputInfo#builder()}
+ * @see org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat
  *
  * @since 2.0
  */
@@ -115,28 +92,10 @@ public Value createValue() {
     return recordReader;
   }
 
-  public static void setInfo(JobConf job, InputInfo info) {
-    setClientInfo(job, info.getClientInfo());
-    setScanAuthorizations(job, info.getScanAuths());
-    setInputTableName(job, info.getTableName());
-
-    // all optional values
-    if (info.getContext().isPresent())
-      setClassLoaderContext(job, info.getContext().get());
-    if (info.getRanges().size() > 0)
-      setRanges(job, info.getRanges());
-    if (info.getIterators().size() > 0)
-      InputConfigurator.writeIteratorsToConf(CLASS, job, info.getIterators());
-    if (info.getFetchColumns().size() > 0)
-      InputConfigurator.fetchColumns(CLASS, job, info.getFetchColumns());
-    if (info.getSamplerConfig().isPresent())
-      setSamplerConfiguration(job, info.getSamplerConfig().get());
-    if (info.getExecutionHints().size() > 0)
-      setExecutionHints(job, info.getExecutionHints());
-    setAutoAdjustRanges(job, info.isAutoAdjustRanges());
-    setScanIsolation(job, info.isScanIsolation());
-    setLocalIterators(job, info.isLocalIterators());
-    setOfflineTableScan(job, info.isOfflineScan());
-    setBatchScan(job, info.isBatchScan());
+  /**
+   * Sets all the information required for this map reduce job.
+   */
+  public static InputFormatBuilder.ClientParams<JobConf> configure() {
+    return new InputFormatBuilderImpl<JobConf>(CLASS);
   }
 }
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormat.java
index dca05a44b6..47864fa7fe 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormat.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormat.java
@@ -17,11 +17,6 @@
 package org.apache.accumulo.hadoop.mapred;
 
 import static org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl.getClientInfo;
-import static org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl.setBatchWriterOptions;
-import static org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl.setClientInfo;
-import static org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl.setCreateTables;
-import static org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl.setDefaultTableName;
-import static org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl.setSimulationMode;
 
 import java.io.IOException;
 
@@ -32,8 +27,9 @@
 import org.apache.accumulo.core.client.ClientInfo;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.hadoop.mapreduce.OutputInfo;
+import org.apache.accumulo.hadoop.mapreduce.OutputFormatBuilder;
 import org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl;
+import org.apache.accumulo.hadoopImpl.mapreduce.OutputFormatBuilderImpl;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
@@ -42,15 +38,7 @@
 import org.apache.hadoop.util.Progressable;
 
 /**
- * This class allows MapReduce jobs to use Accumulo as the sink for data. This {@link OutputFormat}
- * accepts keys and values of type {@link Text} (for a table name) and {@link Mutation} from the Map
- * and Reduce functions.
- *
- * The user must specify the following via static configurator method:
- *
- * <ul>
- * <li>{@link AccumuloOutputFormat#setInfo(JobConf, OutputInfo)}
- * </ul>
+ * @see org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat
  *
  * @since 2.0
  */
@@ -81,14 +69,7 @@ public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException
     }
   }
 
-  public static void setInfo(JobConf job, OutputInfo info) {
-    setClientInfo(job, info.getClientInfo());
-    if (info.getBatchWriterOptions().isPresent())
-      setBatchWriterOptions(job, info.getBatchWriterOptions().get());
-    if (info.getDefaultTableName().isPresent())
-      setDefaultTableName(job, info.getDefaultTableName().get());
-    setCreateTables(job, info.isCreateTables());
-    setSimulationMode(job, info.isSimulationMode());
+  public static OutputFormatBuilder.ClientParams<JobConf> configure() {
+    return new OutputFormatBuilderImpl<JobConf>();
   }
-
 }
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloRowInputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloRowInputFormat.java
index e823d8a5f1..c1c6cbe2fc 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloRowInputFormat.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloRowInputFormat.java
@@ -16,20 +16,6 @@
  */
 package org.apache.accumulo.hadoop.mapred;
 
-import static org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setClassLoaderContext;
-import static org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setClientInfo;
-import static org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setScanAuthorizations;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.fetchColumns;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setAutoAdjustRanges;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setBatchScan;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setExecutionHints;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setInputTableName;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setLocalIterators;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setOfflineTableScan;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setRanges;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setSamplerConfiguration;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setScanIsolation;
-
 import java.io.IOException;
 import java.util.Map.Entry;
 
@@ -37,10 +23,10 @@
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.util.PeekingIterator;
-import org.apache.accumulo.hadoop.mapreduce.InputInfo;
+import org.apache.accumulo.hadoop.mapreduce.InputFormatBuilder;
 import org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat;
 import org.apache.accumulo.hadoopImpl.mapred.InputFormatBase;
-import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator;
+import org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBuilderImpl;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
@@ -49,22 +35,12 @@
 import org.apache.hadoop.mapred.Reporter;
 
 /**
- * This class allows MapReduce jobs to use Accumulo as the source of data. This {@link InputFormat}
- * provides row names as {@link Text} as keys, and a corresponding {@link PeekingIterator} as a
- * value, which in turn makes the {@link Key}/{@link Value} pairs for that row available to the Map
- * function.
- *
- * The user must specify the following via static configurator method:
- *
- * <ul>
- * <li>{@link AccumuloRowInputFormat#setInfo(JobConf, InputInfo)}
- * </ul>
- *
- * For required parameters and all available options use {@link InputInfo#builder()}
+ * @see org.apache.accumulo.hadoop.mapreduce.AccumuloRowInputFormat
  *
  * @since 2.0
  */
 public class AccumuloRowInputFormat implements InputFormat<Text,PeekingIterator<Entry<Key,Value>>> {
+  private static final Class<AccumuloRowInputFormat> CLASS = AccumuloRowInputFormat.class;
 
   /**
    * Gets the splits of the tables that have been set on the job by reading the metadata table for
@@ -123,29 +99,7 @@ public Text createKey() {
   /**
    * Sets all the information required for this map reduce job.
    */
-  public static void setInfo(JobConf job, InputInfo info) {
-    setClientInfo(job, info.getClientInfo());
-    setScanAuthorizations(job, info.getScanAuths());
-    setInputTableName(job, info.getTableName());
-
-    // all optional values
-    if (info.getContext().isPresent())
-      setClassLoaderContext(job, info.getContext().get());
-    if (info.getRanges().size() > 0)
-      setRanges(job, info.getRanges());
-    if (info.getIterators().size() > 0)
-      InputConfigurator.writeIteratorsToConf(AccumuloRowInputFormat.class, job,
-          info.getIterators());
-    if (info.getFetchColumns().size() > 0)
-      fetchColumns(job, info.getFetchColumns());
-    if (info.getSamplerConfig().isPresent())
-      setSamplerConfiguration(job, info.getSamplerConfig().get());
-    if (info.getExecutionHints().size() > 0)
-      setExecutionHints(job, info.getExecutionHints());
-    setAutoAdjustRanges(job, info.isAutoAdjustRanges());
-    setScanIsolation(job, info.isScanIsolation());
-    setLocalIterators(job, info.isLocalIterators());
-    setOfflineTableScan(job, info.isOfflineScan());
-    setBatchScan(job, info.isBatchScan());
+  public static InputFormatBuilder.ClientParams<JobConf> configure() {
+    return new InputFormatBuilderImpl<JobConf>(CLASS);
   }
 }
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloFileOutputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloFileOutputFormat.java
index 26f559efcb..2f869e9773 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloFileOutputFormat.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloFileOutputFormat.java
@@ -16,23 +16,15 @@
  */
 package org.apache.accumulo.hadoop.mapreduce;
 
-import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setCompressionType;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setDataBlockSize;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setFileBlockSize;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setIndexBlockSize;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setReplication;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setSampler;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setSummarizers;
-
 import java.io.IOException;
 
 import org.apache.accumulo.core.client.rfile.RFile;
 import org.apache.accumulo.core.client.rfile.RFileWriter;
-import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.hadoopImpl.mapreduce.FileOutputFormatBuilderImpl;
 import org.apache.accumulo.hadoopImpl.mapreduce.lib.ConfiguratorBase;
 import org.apache.accumulo.hadoopImpl.mapreduce.lib.FileOutputConfigurator;
 import org.apache.hadoop.conf.Configuration;
@@ -45,16 +37,21 @@
 /**
  * This class allows MapReduce jobs to write output in the Accumulo data file format.<br>
  * Care should be taken to write only sorted data (sorted by {@link Key}), as this is an important
- * requirement of Accumulo data files.
+ * requirement of Accumulo data files. The output path to be created must be specified via
+ * {@link #configure()}, which uses a fluent API. For Example:
+ *
+ * <pre>
+ * AccumuloFileOutputFormat.configure()
+ *      .outputPath(path)
+ *      .fileBlockSize(b)
+ *      .compression(type)
+ *      .summarizers(sc1, sc2).store(job));
+ * </pre>
  *
- * <p>
- * The output path to be created must be specified via {@link #setInfo(Job, FileOutputInfo)} using
- * {@link FileOutputInfo#builder()}.outputPath(path). For all available options see
- * {@link FileOutputInfo#builder()}
- * <p>
- * Methods inherited from {@link FileOutputFormat} are not supported and may be ignored or cause
- * failures. Using other Hadoop configuration options that affect the behavior of the underlying
- * files directly in the Job's configuration may work, but are not directly supported at this time.
+ * For all available options see {@link FileOutputFormatBuilder}. Methods inherited from
+ * {@link FileOutputFormat} are not supported and may be ignored or cause failures. Using other
+ * Hadoop configuration options that affect the behavior of the underlying files directly in the
+ * Job's configuration may work, but are not directly supported at this time.
  *
  * @since 2.0
  */
@@ -95,22 +92,8 @@ public void write(Key key, Value value) throws IOException {
   /**
    * Sets all the information required for this map reduce job.
    */
-  public static void setInfo(Job job, FileOutputInfo info) {
-    setOutputPath(job, info.getOutputPath());
-    if (info.getCompressionType().isPresent())
-      setCompressionType(job, info.getCompressionType().get());
-    if (info.getDataBlockSize().isPresent())
-      setDataBlockSize(job, info.getDataBlockSize().get());
-    if (info.getFileBlockSize().isPresent())
-      setFileBlockSize(job, info.getFileBlockSize().get());
-    if (info.getIndexBlockSize().isPresent())
-      setIndexBlockSize(job, info.getIndexBlockSize().get());
-    if (info.getReplication().isPresent())
-      setReplication(job, info.getReplication().get());
-    if (info.getSampler().isPresent())
-      setSampler(job, info.getSampler().get());
-    if (info.getSummarizers().size() > 0)
-      setSummarizers(job, info.getSummarizers().toArray(new SummarizerConfiguration[0]));
+  public static FileOutputFormatBuilder.PathParams<Job> configure() {
+    return new FileOutputFormatBuilderImpl<Job>();
   }
 
 }
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormat.java
index fa64a5a4e2..3c391a858d 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormat.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormat.java
@@ -16,19 +16,6 @@
  */
 package org.apache.accumulo.hadoop.mapreduce;
 
-import static org.apache.accumulo.hadoopImpl.mapreduce.AbstractInputFormat.setClassLoaderContext;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AbstractInputFormat.setClientInfo;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AbstractInputFormat.setScanAuthorizations;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setAutoAdjustRanges;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setBatchScan;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setExecutionHints;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setInputTableName;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setLocalIterators;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setOfflineTableScan;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setRanges;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setSamplerConfiguration;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setScanIsolation;
-
 import java.io.IOException;
 import java.util.List;
 import java.util.Map.Entry;
@@ -38,7 +25,7 @@
 import org.apache.accumulo.core.util.format.DefaultFormatter;
 import org.apache.accumulo.hadoopImpl.mapreduce.AbstractInputFormat;
 import org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase;
-import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator;
+import org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBuilderImpl;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
@@ -50,15 +37,20 @@
 
 /**
  * This class allows MapReduce jobs to use Accumulo as the source of data. This {@link InputFormat}
- * provides keys and values of type {@link Key} and {@link Value} to the Map function.
- *
- * The user must specify the following via static configurator method:
+ * provides keys and values of type {@link Key} and {@link Value} to the Map function. Configure the
+ * job using the {@link #configure()} method, which provides a fluent API. For Example:
  *
- * <ul>
- * <li>{@link AccumuloInputFormat#setInfo(Job, InputInfo)}
- * </ul>
+ * <pre>
+ * AccumuloInputFormat.configure().clientInfo(info).table(name).auths(auths) // required
+ *     .addIterator(iter1).ranges(ranges).fetchColumns(columns).executionHints(hints)
+ *     .samplerConfiguration(sampleConf).disableAutoAdjustRanges() // enabled by default
+ *     .scanIsolation() // not available with batchScan()
+ *     .offlineScan() // not available with batchScan()
+ *     .store(job);
+ * </pre>
  *
- * For required parameters and all available options use {@link InputInfo#builder()}
+ * For descriptions of all options see
+ * {@link org.apache.accumulo.hadoop.mapreduce.InputFormatBuilder.InputFormatOptions}
  *
  * @since 2.0
  */
@@ -103,28 +95,7 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
   /**
    * Sets all the information required for this map reduce job.
    */
-  public static void setInfo(Job job, InputInfo info) {
-    setClientInfo(job, info.getClientInfo());
-    setScanAuthorizations(job, info.getScanAuths());
-    setInputTableName(job, info.getTableName());
-
-    // all optional values
-    if (info.getContext().isPresent())
-      setClassLoaderContext(job, info.getContext().get());
-    if (info.getRanges().size() > 0)
-      setRanges(job, info.getRanges());
-    if (info.getIterators().size() > 0)
-      InputConfigurator.writeIteratorsToConf(CLASS, job.getConfiguration(), info.getIterators());
-    if (info.getFetchColumns().size() > 0)
-      InputConfigurator.fetchColumns(CLASS, job.getConfiguration(), info.getFetchColumns());
-    if (info.getSamplerConfig().isPresent())
-      setSamplerConfiguration(job, info.getSamplerConfig().get());
-    if (info.getExecutionHints().size() > 0)
-      setExecutionHints(job, info.getExecutionHints());
-    setAutoAdjustRanges(job, info.isAutoAdjustRanges());
-    setScanIsolation(job, info.isScanIsolation());
-    setLocalIterators(job, info.isLocalIterators());
-    setOfflineTableScan(job, info.isOfflineScan());
-    setBatchScan(job, info.isBatchScan());
+  public static InputFormatBuilder.ClientParams<Job> configure() {
+    return new InputFormatBuilderImpl<>(CLASS);
   }
 }
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormat.java
index a285988103..565de0e204 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormat.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormat.java
@@ -17,11 +17,6 @@
 package org.apache.accumulo.hadoop.mapreduce;
 
 import static org.apache.accumulo.hadoopImpl.mapreduce.AbstractInputFormat.getClientInfo;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloOutputFormatImpl.setBatchWriterOptions;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloOutputFormatImpl.setClientInfo;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloOutputFormatImpl.setCreateTables;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloOutputFormatImpl.setDefaultTableName;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloOutputFormatImpl.setSimulationMode;
 
 import java.io.IOException;
 
@@ -33,6 +28,7 @@
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.hadoopImpl.mapreduce.AccumuloOutputFormatImpl;
+import org.apache.accumulo.hadoopImpl.mapreduce.OutputFormatBuilderImpl;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -45,13 +41,15 @@
 /**
  * This class allows MapReduce jobs to use Accumulo as the sink for data. This {@link OutputFormat}
  * accepts keys and values of type {@link Text} (for a table name) and {@link Mutation} from the Map
- * and Reduce functions.
+ * and Reduce functions. Configured with fluent API using {@link AccumuloOutputFormat#configure()}.
+ * Here is an example with all possible options:
  *
- * The user must specify the following via static configurator method:
- *
- * <ul>
- * <li>{@link AccumuloOutputFormat#setInfo(Job, OutputInfo)}
- * </ul>
+ * <pre>
+ * AccumuloOutputFormat.configure().clientInfo(clientInfo).batchWriterOptions(bwConfig)
+ *     .defaultTable(name).createTables() // disabled by default
+ *     .simulationMode() // disabled by default
+ *     .store(job);
+ * </pre>
  *
  * @since 2.0
  */
@@ -88,14 +86,11 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context) {
     }
   }
 
-  public static void setInfo(Job job, OutputInfo info) {
-    setClientInfo(job, info.getClientInfo());
-    if (info.getBatchWriterOptions().isPresent())
-      setBatchWriterOptions(job, info.getBatchWriterOptions().get());
-    if (info.getDefaultTableName().isPresent())
-      setDefaultTableName(job, info.getDefaultTableName().get());
-    setCreateTables(job, info.isCreateTables());
-    setSimulationMode(job, info.isSimulationMode());
+  /**
+   * Sets all the information required for this map reduce job.
+   */
+  public static OutputFormatBuilder.ClientParams<Job> configure() {
+    return new OutputFormatBuilderImpl<Job>();
   }
 
 }
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloRowInputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloRowInputFormat.java
index d7bba69b1a..899eb28292 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloRowInputFormat.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloRowInputFormat.java
@@ -16,19 +16,6 @@
  */
 package org.apache.accumulo.hadoop.mapreduce;
 
-import static org.apache.accumulo.hadoopImpl.mapreduce.AbstractInputFormat.setClassLoaderContext;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AbstractInputFormat.setClientInfo;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AbstractInputFormat.setScanAuthorizations;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setAutoAdjustRanges;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setBatchScan;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setExecutionHints;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setInputTableName;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setLocalIterators;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setOfflineTableScan;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setRanges;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setSamplerConfiguration;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setScanIsolation;
-
 import java.io.IOException;
 import java.util.List;
 import java.util.Map.Entry;
@@ -39,7 +26,7 @@
 import org.apache.accumulo.core.util.PeekingIterator;
 import org.apache.accumulo.hadoopImpl.mapreduce.AbstractInputFormat;
 import org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase;
-import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator;
+import org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBuilderImpl;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -52,15 +39,20 @@
  * This class allows MapReduce jobs to use Accumulo as the source of data. This {@link InputFormat}
  * provides row names as {@link Text} as keys, and a corresponding {@link PeekingIterator} as a
  * value, which in turn makes the {@link Key}/{@link Value} pairs for that row available to the Map
- * function.
- *
- * The user must specify the following via static configurator method:
+ * function. Configure the job using the {@link #configure()} method, which provides a fluent API.
+ * For Example:
  *
- * <ul>
- * <li>{@link AccumuloRowInputFormat#setInfo(Job, InputInfo)}
- * </ul>
+ * <pre>
+ * AccumuloRowInputFormat.configure().clientInfo(info).table(name).auths(auths) // required
+ *     .addIterator(iter1).ranges(ranges).fetchColumns(columns).executionHints(hints)
+ *     .samplerConfiguration(sampleConf).disableAutoAdjustRanges() // enabled by default
+ *     .scanIsolation() // not available with batchScan()
+ *     .offlineScan() // not available with batchScan()
+ *     .store(job);
+ * </pre>
  *
- * For required parameters and all available options use {@link InputInfo#builder()}
+ * For descriptions of all options see
+ * {@link org.apache.accumulo.hadoop.mapreduce.InputFormatBuilder.InputFormatOptions}
  *
  * @since 2.0
  */
@@ -111,28 +103,7 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
   /**
    * Sets all the information required for this map reduce job.
    */
-  public static void setInfo(Job job, InputInfo info) {
-    setClientInfo(job, info.getClientInfo());
-    setScanAuthorizations(job, info.getScanAuths());
-    setInputTableName(job, info.getTableName());
-
-    // all optional values
-    if (info.getContext().isPresent())
-      setClassLoaderContext(job, info.getContext().get());
-    if (info.getRanges().size() > 0)
-      setRanges(job, info.getRanges());
-    if (info.getIterators().size() > 0)
-      InputConfigurator.writeIteratorsToConf(CLASS, job.getConfiguration(), info.getIterators());
-    if (info.getFetchColumns().size() > 0)
-      InputConfigurator.fetchColumns(CLASS, job.getConfiguration(), info.getFetchColumns());
-    if (info.getSamplerConfig().isPresent())
-      setSamplerConfiguration(job, info.getSamplerConfig().get());
-    if (info.getExecutionHints().size() > 0)
-      setExecutionHints(job, info.getExecutionHints());
-    setAutoAdjustRanges(job, info.isAutoAdjustRanges());
-    setScanIsolation(job, info.isScanIsolation());
-    setLocalIterators(job, info.isLocalIterators());
-    setOfflineTableScan(job, info.isOfflineScan());
-    setBatchScan(job, info.isBatchScan());
+  public static InputFormatBuilder.ClientParams<Job> configure() {
+    return new InputFormatBuilderImpl<Job>(CLASS);
   }
 }
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/FileOutputFormatBuilder.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/FileOutputFormatBuilder.java
new file mode 100644
index 0000000000..d4e4fc8fd1
--- /dev/null
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/FileOutputFormatBuilder.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoop.mapreduce;
+
+import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Builder for all the information needed for the Map Reduce job. Fluent API used by
+ * {@link AccumuloFileOutputFormat#configure()}
+ *
+ * @since 2.0
+ */
+public interface FileOutputFormatBuilder {
+  /**
+   * Required params for builder
+   *
+   * @since 2.0
+   */
+  interface PathParams<T> {
+    /**
+     * Set the Path of the output directory for the map-reduce job.
+     */
+    OutputOptions<T> outputPath(Path path);
+  }
+
+  /**
+   * Options for builder
+   *
+   * @since 2.0
+   */
+  interface OutputOptions<T> {
+    /**
+     * Sets the compression type to use for data blocks, overriding the default. Specifying a
+     * compression may require additional libraries to be available to your Job.
+     *
+     * @param compressionType
+     *          one of "none", "gz", "lzo", or "snappy"
+     */
+    OutputOptions<T> compression(String compressionType);
+
+    /**
+     * Sets the size for data blocks within each file.<br>
+     * Data blocks are a span of key/value pairs stored in the file that are compressed and indexed
+     * as a group.
+     *
+     * <p>
+     * Making this value smaller may increase seek performance, but at the cost of increasing the
+     * size of the indexes (which can also affect seek performance).
+     *
+     * @param dataBlockSize
+     *          the block size, in bytes
+     */
+    OutputOptions<T> dataBlockSize(long dataBlockSize);
+
+    /**
+     * Sets the size for file blocks in the file system; file blocks are managed, and replicated, by
+     * the underlying file system.
+     *
+     * @param fileBlockSize
+     *          the block size, in bytes
+     */
+    OutputOptions<T> fileBlockSize(long fileBlockSize);
+
+    /**
+     * Sets the size for index blocks within each file; smaller blocks means a deeper index
+     * hierarchy within the file, while larger blocks mean a more shallow index hierarchy within the
+     * file. This can affect the performance of queries.
+     *
+     * @param indexBlockSize
+     *          the block size, in bytes
+     */
+    OutputOptions<T> indexBlockSize(long indexBlockSize);
+
+    /**
+     * Sets the file system replication factor for the resulting file, overriding the file system
+     * default.
+     *
+     * @param replication
+     *          the number of replicas for produced files
+     */
+    OutputOptions<T> replication(int replication);
+
+    /**
+     * Specify a sampler to be used when writing out data. This will result in the output file
+     * having sample data.
+     *
+     * @param samplerConfig
+     *          The configuration for creating sample data in the output file.
+     */
+    OutputOptions<T> sampler(SamplerConfiguration samplerConfig);
+
+    /**
+     * Specifies a list of summarizer configurations to create summary data in the output file. Each
+     * Key Value written will be passed to the configured
+     * {@link org.apache.accumulo.core.client.summary.Summarizer}'s.
+     *
+     * @param summarizerConfigs
+     *          summarizer configurations
+     */
+    OutputOptions<T> summarizers(SummarizerConfiguration... summarizerConfigs);
+
+    /**
+     * Finish configuring, verify and serialize options into the Job or JobConf
+     */
+    void store(T job);
+  }
+
+}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/FileOutputInfo.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/FileOutputInfo.java
deleted file mode 100644
index 70b60433db..0000000000
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/FileOutputInfo.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.hadoop.mapreduce;
-
-import java.util.Collection;
-import java.util.Optional;
-
-import org.apache.accumulo.core.client.sample.SamplerConfiguration;
-import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
-import org.apache.accumulo.hadoopImpl.mapreduce.FileOutputInfoImpl;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-
-/**
- * Object containing all the information needed for the Map Reduce job. This object is passed to
- * {@link AccumuloFileOutputFormat#setInfo(Job, FileOutputInfo)}. It uses a fluent API like so:
- *
- * <pre>
- * FileOutputInfo.builder()
- *      .outputPath(path)
- *      .fileBlockSize(b)
- *      .compressionType(type)
- *      .summarizers(sc1, sc2).build());
- * </pre>
- *
- * @since 2.0
- */
-public interface FileOutputInfo {
-
-  /**
-   * @return the output path set using FileOutputInfo.builder()...outputPath(path)
-   */
-  public Path getOutputPath();
-
-  /**
-   * @return the compression if set using FileOutputInfo.builder()...compressionType(type)
-   */
-  public Optional<String> getCompressionType();
-
-  /**
-   * @return the data block size if set using FileOutputInfo.builder()...dataBlockSize(size)
-   */
-  public Optional<Long> getDataBlockSize();
-
-  /**
-   * @return the file block size if set using FileOutputInfo.builder()...fileBlockSize(size)
-   */
-  public Optional<Long> getFileBlockSize();
-
-  /**
-   * @return the index block size if set using FileOutputInfo.builder()...indexBlockSize(size)
-   */
-  public Optional<Long> getIndexBlockSize();
-
-  /**
-   * @return the replication if set using FileOutputInfo.builder()...replication(num)
-   */
-  public Optional<Integer> getReplication();
-
-  /**
-   * @return the SamplerConfiguration if set using FileOutputInfo.builder()...sampler(conf)
-   */
-  public Optional<SamplerConfiguration> getSampler();
-
-  /**
-   * @return the summarizers set using FileOutputInfo.builder()...summarizers(conf1, conf2...)
-   */
-  public Collection<SummarizerConfiguration> getSummarizers();
-
-  /**
-   * @return builder for creating a {@link FileOutputInfo}
-   */
-  public static FileOutputInfoBuilder.PathParams builder() {
-    return new FileOutputInfoImpl.FileOutputInfoBuilderImpl();
-  }
-
-  /**
-   * Fluent API builder for FileOutputInfo
-   *
-   * @since 2.0
-   */
-  interface FileOutputInfoBuilder {
-
-    /**
-     * Required params for builder
-     *
-     * @since 2.0
-     */
-    interface PathParams {
-      /**
-       * Set the Path of the output directory for the map-reduce job.
-       */
-      OutputOptions outputPath(Path path);
-    }
-
-    /**
-     * Options for builder
-     *
-     * @since 2.0
-     */
-    interface OutputOptions {
-      /**
-       * Sets the compression type to use for data blocks, overriding the default. Specifying a
-       * compression may require additional libraries to be available to your Job.
-       *
-       * @param compressionType
-       *          one of "none", "gz", "lzo", or "snappy"
-       */
-      OutputOptions compressionType(String compressionType);
-
-      /**
-       * Sets the size for data blocks within each file.<br>
-       * Data blocks are a span of key/value pairs stored in the file that are compressed and
-       * indexed as a group.
-       *
-       * <p>
-       * Making this value smaller may increase seek performance, but at the cost of increasing the
-       * size of the indexes (which can also affect seek performance).
-       *
-       * @param dataBlockSize
-       *          the block size, in bytes
-       */
-      OutputOptions dataBlockSize(long dataBlockSize);
-
-      /**
-       * Sets the size for file blocks in the file system; file blocks are managed, and replicated,
-       * by the underlying file system.
-       *
-       * @param fileBlockSize
-       *          the block size, in bytes
-       */
-      OutputOptions fileBlockSize(long fileBlockSize);
-
-      /**
-       * Sets the size for index blocks within each file; smaller blocks means a deeper index
-       * hierarchy within the file, while larger blocks mean a more shallow index hierarchy within
-       * the file. This can affect the performance of queries.
-       *
-       * @param indexBlockSize
-       *          the block size, in bytes
-       */
-      OutputOptions indexBlockSize(long indexBlockSize);
-
-      /**
-       * Sets the file system replication factor for the resulting file, overriding the file system
-       * default.
-       *
-       * @param replication
-       *          the number of replicas for produced files
-       */
-      OutputOptions replication(int replication);
-
-      /**
-       * Specify a sampler to be used when writing out data. This will result in the output file
-       * having sample data.
-       *
-       * @param samplerConfig
-       *          The configuration for creating sample data in the output file.
-       */
-      OutputOptions sampler(SamplerConfiguration samplerConfig);
-
-      /**
-       * Specifies a list of summarizer configurations to create summary data in the output file.
-       * Each Key Value written will be passed to the configured
-       * {@link org.apache.accumulo.core.client.summary.Summarizer}'s.
-       *
-       * @param summarizerConfigs
-       *          summarizer configurations
-       */
-      OutputOptions summarizers(SummarizerConfiguration... summarizerConfigs);
-
-      /**
-       * @return newly created {@link FileOutputInfo}
-       */
-      FileOutputInfo build();
-    }
-  }
-}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/InputFormatBuilder.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/InputFormatBuilder.java
new file mode 100644
index 0000000000..1751d9fd2f
--- /dev/null
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/InputFormatBuilder.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoop.mapreduce;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.accumulo.core.client.ClientInfo;
+import org.apache.accumulo.core.client.ClientSideIteratorScanner;
+import org.apache.accumulo.core.client.IsolatedScanner;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.security.Authorizations;
+
+/**
+ * Builder for all the information needed for the Map Reduce job. Fluent API used by
+ * {@link AccumuloInputFormat#configure()}
+ *
+ * @since 2.0
+ */
+public interface InputFormatBuilder {
+  /**
+   * Required params for builder
+   *
+   * @since 2.0
+   */
+  interface ClientParams<T> {
+    /**
+     * Set the connection information needed to communicate with Accumulo in this job. ClientInfo
+     * param can be created using {@link ClientInfo#from(Properties)}
+     *
+     * @param clientInfo
+     *          Accumulo connection information
+     */
+    TableParams<T> clientInfo(ClientInfo clientInfo);
+  }
+
+  /**
+   * Required params for builder
+   *
+   * @since 2.0
+   */
+  interface TableParams<T> {
+    /**
+     * Sets the name of the input table, over which this job will scan.
+     *
+     * @param tableName
+     *          the table to use when the tablename is null in the write call
+     */
+    AuthsParams<T> table(String tableName);
+  }
+
+  /**
+   * Required params for builder
+   *
+   * @since 2.0
+   */
+  interface AuthsParams<T> {
+    /**
+     * Sets the {@link Authorizations} used to scan. Must be a subset of the user's authorizations.
+     * If none present use {@link Authorizations#EMPTY}
+     *
+     * @param auths
+     *          the user's authorizations
+     */
+    InputFormatOptions<T> auths(Authorizations auths);
+  }
+
+  /**
+   * Options for batch scan
+   *
+   * @since 2.0
+   */
+  interface BatchScanOptions<T> {
+    /**
+     * Finish configuring, verify and store options into the JobConf or Job
+     */
+    void store(T t);
+  }
+
+  /**
+   * Options for scan
+   *
+   * @since 2.0
+   */
+  interface ScanOptions<T> extends BatchScanOptions<T> {
+    /**
+     * @see InputFormatOptions#scanIsolation()
+     */
+    ScanOptions<T> scanIsolation();
+
+    /**
+     * @see InputFormatOptions#localIterators()
+     */
+    ScanOptions<T> localIterators();
+
+    /**
+     * @see InputFormatOptions#offlineScan()
+     */
+    ScanOptions<T> offlineScan();
+  }
+
+  /**
+   * Optional values to set using fluent API
+   *
+   * @since 2.0
+   */
+  interface InputFormatOptions<T> {
+    /**
+     * Sets the name of the classloader context on this scanner
+     *
+     * @param context
+     *          name of the classloader context
+     */
+    InputFormatOptions<T> classLoaderContext(String context);
+
+    /**
+     * Sets the input ranges to scan for the single input table associated with this job.
+     *
+     * @param ranges
+     *          the ranges that will be mapped over
+     * @see TableOperations#splitRangeByTablets(String, Range, int)
+     */
+    InputFormatOptions<T> ranges(Collection<Range> ranges);
+
+    /**
+     * Restricts the columns that will be mapped over for this job for the default input table.
+     *
+     * @param fetchColumns
+     *          a collection of IteratorSetting.Column objects corresponding to column family and
+     *          column qualifier. If the column qualifier is null, the entire column family is
+     *          selected. An empty set is the default and is equivalent to scanning all columns.
+     */
+    InputFormatOptions<T> fetchColumns(Collection<IteratorSetting.Column> fetchColumns);
+
+    /**
+     * Encode an iterator on the single input table for this job. It is safe to call this method
+     * multiple times. If an iterator is added with the same name, it will be overridden.
+     *
+     * @param cfg
+     *          the configuration of the iterator
+     */
+    InputFormatOptions<T> addIterator(IteratorSetting cfg);
+
+    /**
+     * Set these execution hints on scanners created for input splits. See
+     * {@link ScannerBase#setExecutionHints(java.util.Map)}
+     */
+    InputFormatOptions<T> executionHints(Map<String,String> hints);
+
+    /**
+     * Causes input format to read sample data. If sample data was created using a different
+     * configuration or a tables sampler configuration changes while reading data, then the input
+     * format will throw an error.
+     *
+     * @param samplerConfig
+     *          The sampler configuration that sample must have been created with inorder for
+     *          reading sample data to succeed.
+     *
+     * @see ScannerBase#setSamplerConfiguration(SamplerConfiguration)
+     */
+    InputFormatOptions<T> samplerConfiguration(SamplerConfiguration samplerConfig);
+
+    /**
+     * Disables the automatic adjustment of ranges for this job. This feature merges overlapping
+     * ranges, then splits them to align with tablet boundaries. Disabling this feature will cause
+     * exactly one Map task to be created for each specified range. Disabling has no effect for
+     * batch scans at it will always automatically adjust ranges.
+     * <p>
+     * By default, this feature is <b>enabled</b>.
+     *
+     * @see #ranges(Collection)
+     */
+    InputFormatOptions<T> disableAutoAdjustRanges();
+
+    /**
+     * Enables the use of the {@link IsolatedScanner} in this job.
+     * <p>
+     * By default, this feature is <b>disabled</b>.
+     */
+    ScanOptions<T> scanIsolation();
+
+    /**
+     * Enables the use of the {@link ClientSideIteratorScanner} in this job. This feature will cause
+     * the iterator stack to be constructed within the Map task, rather than within the Accumulo
+     * TServer. To use this feature, all classes needed for those iterators must be available on the
+     * classpath for the task.
+     * <p>
+     * By default, this feature is <b>disabled</b>.
+     */
+    ScanOptions<T> localIterators();
+
+    /**
+     * Enable reading offline tables. By default, this feature is disabled and only online tables
+     * are scanned. This will make the map reduce job directly read the table's files. If the table
+     * is not offline, then the job will fail. If the table comes online during the map reduce job,
+     * it is likely that the job will fail.
+     * <p>
+     * To use this option, the map reduce user will need access to read the Accumulo directory in
+     * HDFS.
+     * <p>
+     * Reading the offline table will create the scan time iterator stack in the map process. So any
+     * iterators that are configured for the table will need to be on the mapper's classpath.
+     * <p>
+     * One way to use this feature is to clone a table, take the clone offline, and use the clone as
+     * the input table for a map reduce job. If you plan to map reduce over the data many times, it
+     * may be better to the compact the table, clone it, take it offline, and use the clone for all
+     * map reduce jobs. The reason to do this is that compaction will reduce each tablet in the
+     * table to one file, and it is faster to read from one file.
+     * <p>
+     * There are two possible advantages to reading a tables file directly out of HDFS. First, you
+     * may see better read performance. Second, it will support speculative execution better. When
+     * reading an online table speculative execution can put more load on an already slow tablet
+     * server.
+     * <p>
+     * By default, this feature is <b>disabled</b>.
+     */
+    ScanOptions<T> offlineScan();
+
+    /**
+     * Enables the use of the {@link org.apache.accumulo.core.client.BatchScanner} in this job.
+     * Using this feature will group Ranges by their source tablet, producing an InputSplit per
+     * tablet rather than per Range. This batching helps to reduce overhead when querying a large
+     * number of small ranges. (ex: when doing quad-tree decomposition for spatial queries)
+     * <p>
+     * In order to achieve good locality of InputSplits this option always clips the input Ranges to
+     * tablet boundaries. This may result in one input Range contributing to several InputSplits.
+     * <p>
+     * Note: calls to {@link #disableAutoAdjustRanges()} is ignored when BatchScan is enabled.
+     * <p>
+     * This configuration is incompatible with:
+     * <ul>
+     * <li>{@link #offlineScan()}</li>
+     * <li>{@link #localIterators()}</li>
+     * <li>{@link #scanIsolation()}</li>
+     * </ul>
+     * <p>
+     * By default, this feature is <b>disabled</b>.
+     */
+    BatchScanOptions<T> batchScan();
+
+    /**
+     * Finish configuring, verify and serialize options into the JobConf or Job
+     */
+    void store(T j);
+  }
+}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/InputInfo.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/InputInfo.java
deleted file mode 100644
index d443ce4363..0000000000
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/InputInfo.java
+++ /dev/null
@@ -1,364 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.hadoop.mapreduce;
-
-import java.nio.file.Path;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-import org.apache.accumulo.core.client.ClientInfo;
-import org.apache.accumulo.core.client.ClientSideIteratorScanner;
-import org.apache.accumulo.core.client.IsolatedScanner;
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.ScannerBase;
-import org.apache.accumulo.core.client.admin.TableOperations;
-import org.apache.accumulo.core.client.sample.SamplerConfiguration;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.hadoopImpl.mapreduce.InputInfoImpl;
-import org.apache.hadoop.mapreduce.Job;
-
-/**
- * Object containing all the information needed for the Map Reduce job. This object is passed to the
- * different Input Format types see {@link AccumuloInputFormat#setInfo(Job, InputInfo)}. It uses a
- * fluent API:
- *
- * <pre>
- * InputInfo.builder().clientInfo(info).table(name).scanAuths(auths).build();
- *
- * InputInfo.builder().clientProperties(props).table(name).scanAuths(auths).addIterator(cfg)
- *     .executionHints(hints).build();
- * </pre>
- *
- * @since 2.0
- */
-public interface InputInfo {
-  /**
-   * @return the table name set using InputInfo.builder()...table(name)
-   */
-  String getTableName();
-
-  /**
-   * @return the client info set using InputInfo.builder().clientInfo(info)
-   */
-  ClientInfo getClientInfo();
-
-  /**
-   * @return the scan authorizations set using InputInfo.builder()...scanAuths(auths)
-   */
-  Authorizations getScanAuths();
-
-  /**
-   * @return the context set using InputInfo.builder()...classLoaderContext(context)
-   */
-  Optional<String> getContext();
-
-  /**
-   * @return the Ranges set using InputInfo.builder()...ranges(ranges)
-   */
-  Collection<Range> getRanges();
-
-  /**
-   * @return the ColumnFamily,ColumnQualifier Pairs set using
-   *         InputInfo.builder()...fetchColumns(cfcqPairs)
-   */
-  Collection<IteratorSetting.Column> getFetchColumns();
-
-  /**
-   * @return the collection of IteratorSettings set using InputInfo.builder()...addIterator(cfg)
-   */
-  Collection<IteratorSetting> getIterators();
-
-  /**
-   * @return the SamplerConfiguration set using InputInfo.builder()...samplerConfiguration(cfg)
-   */
-  Optional<SamplerConfiguration> getSamplerConfig();
-
-  /**
-   * @return the Execution Hints set using InputInfo.builder()...executionHints(hints)
-   */
-  Map<String,String> getExecutionHints();
-
-  /**
-   * @return boolean if auto adjusting ranges or not
-   */
-  boolean isAutoAdjustRanges();
-
-  /**
-   * @return boolean if using scan isolation or not
-   */
-  boolean isScanIsolation();
-
-  /**
-   * @return boolean if using local iterators or not
-   */
-  boolean isLocalIterators();
-
-  /**
-   * @return boolean if using offline scan or not
-   */
-  boolean isOfflineScan();
-
-  /**
-   * @return boolean if using batch scanner or not
-   */
-  boolean isBatchScan();
-
-  /**
-   * Builder starting point for map reduce input format information.
-   */
-  static InputInfoBuilder.ClientParams builder() {
-    return new InputInfoImpl.InputInfoBuilderImpl();
-  }
-
-  /**
-   * Required build values to be set.
-   *
-   * @since 2.0
-   */
-  interface InputInfoBuilder {
-    /**
-     * Required params for builder
-     *
-     * @since 2.0
-     */
-    interface ClientParams {
-      /**
-       * Set the connection information needed to communicate with Accumulo in this job. ClientInfo
-       * param can be created using {@link ClientInfo#from(Path)} or
-       * {@link ClientInfo#from(Properties)}
-       *
-       * @param clientInfo
-       *          Accumulo connection information
-       */
-      TableParams clientInfo(ClientInfo clientInfo);
-    }
-
-    /**
-     * Required params for builder
-     *
-     * @since 2.0
-     */
-    interface TableParams {
-      /**
-       * Sets the name of the input table, over which this job will scan.
-       *
-       * @param tableName
-       *          the table to use when the tablename is null in the write call
-       */
-      AuthsParams table(String tableName);
-    }
-
-    /**
-     * Required params for builder
-     *
-     * @since 2.0
-     */
-    interface AuthsParams {
-      /**
-       * Sets the {@link Authorizations} used to scan. Must be a subset of the user's
-       * authorizations. If none present use {@link Authorizations#EMPTY}
-       *
-       * @param auths
-       *          the user's authorizations
-       */
-      InputFormatOptions scanAuths(Authorizations auths);
-    }
-
-    /**
-     * Options for batch scan
-     *
-     * @since 2.0
-     */
-    interface BatchScanOptions {
-      /**
-       * @return newly created {@link InputInfo}
-       */
-      InputInfo build();
-    }
-
-    /**
-     * Options for scan
-     *
-     * @since 2.0
-     */
-    interface ScanOptions extends BatchScanOptions {
-      /**
-       * @see InputFormatOptions#scanIsolation()
-       */
-      ScanOptions scanIsolation();
-
-      /**
-       * @see InputFormatOptions#localIterators()
-       */
-      ScanOptions localIterators();
-
-      /**
-       * @see InputFormatOptions#offlineScan()
-       */
-      ScanOptions offlineScan();
-    }
-
-    /**
-     * Optional values to set using fluent API
-     *
-     * @since 2.0
-     */
-    interface InputFormatOptions {
-      /**
-       * Sets the name of the classloader context on this scanner
-       *
-       * @param context
-       *          name of the classloader context
-       */
-      InputFormatOptions classLoaderContext(String context);
-
-      /**
-       * Sets the input ranges to scan for the single input table associated with this job.
-       *
-       * @param ranges
-       *          the ranges that will be mapped over
-       * @see TableOperations#splitRangeByTablets(String, Range, int)
-       */
-      InputFormatOptions ranges(Collection<Range> ranges);
-
-      /**
-       * Restricts the columns that will be mapped over for this job for the default input table.
-       *
-       * @param fetchColumns
-       *          a collection of IteratorSetting.Column objects corresponding to column family and
-       *          column qualifier. If the column qualifier is null, the entire column family is
-       *          selected. An empty set is the default and is equivalent to scanning all columns.
-       */
-      InputFormatOptions fetchColumns(Collection<IteratorSetting.Column> fetchColumns);
-
-      /**
-       * Encode an iterator on the single input table for this job. It is safe to call this method
-       * multiple times. If an iterator is added with the same name, it will be overridden.
-       *
-       * @param cfg
-       *          the configuration of the iterator
-       */
-      InputFormatOptions addIterator(IteratorSetting cfg);
-
-      /**
-       * Set these execution hints on scanners created for input splits. See
-       * {@link ScannerBase#setExecutionHints(java.util.Map)}
-       */
-      InputFormatOptions executionHints(Map<String,String> hints);
-
-      /**
-       * Causes input format to read sample data. If sample data was created using a different
-       * configuration or a tables sampler configuration changes while reading data, then the input
-       * format will throw an error.
-       *
-       * @param samplerConfig
-       *          The sampler configuration that sample must have been created with inorder for
-       *          reading sample data to succeed.
-       *
-       * @see ScannerBase#setSamplerConfiguration(SamplerConfiguration)
-       */
-      InputFormatOptions samplerConfiguration(SamplerConfiguration samplerConfig);
-
-      /**
-       * Disables the automatic adjustment of ranges for this job. This feature merges overlapping
-       * ranges, then splits them to align with tablet boundaries. Disabling this feature will cause
-       * exactly one Map task to be created for each specified range. Disabling has no effect for
-       * batch scans at it will always automatically adjust ranges.
-       * <p>
-       * By default, this feature is <b>enabled</b>.
-       *
-       * @see #ranges(Collection)
-       */
-      InputFormatOptions disableAutoAdjustRanges();
-
-      /**
-       * Enables the use of the {@link IsolatedScanner} in this job.
-       * <p>
-       * By default, this feature is <b>disabled</b>.
-       */
-      ScanOptions scanIsolation();
-
-      /**
-       * Enables the use of the {@link ClientSideIteratorScanner} in this job. This feature will
-       * cause the iterator stack to be constructed within the Map task, rather than within the
-       * Accumulo TServer. To use this feature, all classes needed for those iterators must be
-       * available on the classpath for the task.
-       * <p>
-       * By default, this feature is <b>disabled</b>.
-       */
-      ScanOptions localIterators();
-
-      /**
-       * Enable reading offline tables. By default, this feature is disabled and only online tables
-       * are scanned. This will make the map reduce job directly read the table's files. If the
-       * table is not offline, then the job will fail. If the table comes online during the map
-       * reduce job, it is likely that the job will fail.
-       * <p>
-       * To use this option, the map reduce user will need access to read the Accumulo directory in
-       * HDFS.
-       * <p>
-       * Reading the offline table will create the scan time iterator stack in the map process. So
-       * any iterators that are configured for the table will need to be on the mapper's classpath.
-       * <p>
-       * One way to use this feature is to clone a table, take the clone offline, and use the clone
-       * as the input table for a map reduce job. If you plan to map reduce over the data many
-       * times, it may be better to the compact the table, clone it, take it offline, and use the
-       * clone for all map reduce jobs. The reason to do this is that compaction will reduce each
-       * tablet in the table to one file, and it is faster to read from one file.
-       * <p>
-       * There are two possible advantages to reading a tables file directly out of HDFS. First, you
-       * may see better read performance. Second, it will support speculative execution better. When
-       * reading an online table speculative execution can put more load on an already slow tablet
-       * server.
-       * <p>
-       * By default, this feature is <b>disabled</b>.
-       */
-      ScanOptions offlineScan();
-
-      /**
-       * Enables the use of the {@link org.apache.accumulo.core.client.BatchScanner} in this job.
-       * Using this feature will group Ranges by their source tablet, producing an InputSplit per
-       * tablet rather than per Range. This batching helps to reduce overhead when querying a large
-       * number of small ranges. (ex: when doing quad-tree decomposition for spatial queries)
-       * <p>
-       * In order to achieve good locality of InputSplits this option always clips the input Ranges
-       * to tablet boundaries. This may result in one input Range contributing to several
-       * InputSplits.
-       * <p>
-       * Note: calls to {@link #disableAutoAdjustRanges()} is ignored when BatchScan is enabled.
-       * <p>
-       * This configuration is incompatible with:
-       * <ul>
-       * <li>{@link #offlineScan()}</li>
-       * <li>{@link #localIterators()}</li>
-       * <li>{@link #scanIsolation()}</li>
-       * </ul>
-       * <p>
-       * By default, this feature is <b>disabled</b>.
-       */
-      BatchScanOptions batchScan();
-
-      /**
-       * @return newly created {@link InputInfo}
-       */
-      InputInfo build();
-    }
-  }
-}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/OutputFormatBuilder.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/OutputFormatBuilder.java
new file mode 100644
index 0000000000..e12d803ef9
--- /dev/null
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/OutputFormatBuilder.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoop.mapreduce;
+
+import java.util.Properties;
+
+import org.apache.accumulo.core.client.ClientInfo;
+
+/**
+ * Builder for all the information needed for the Map Reduce job. Fluent API used by
+ * {@link AccumuloOutputFormat#configure()}
+ *
+ * @since 2.0
+ */
+public interface OutputFormatBuilder {
+
+  /**
+   * Required params for client
+   *
+   * @since 2.0
+   */
+  interface ClientParams<T> {
+    /**
+     * Set the connection information needed to communicate with Accumulo in this job. ClientInfo
+     * param can be created using {@link ClientInfo#from(Properties)}
+     *
+     * @param clientInfo
+     *          Accumulo connection information
+     */
+    OutputOptions<T> clientInfo(ClientInfo clientInfo);
+  }
+
+  /**
+   * Builder options
+   *
+   * @since 2.0
+   */
+  interface OutputOptions<T> {
+    /**
+     * Sets the default table name to use if one emits a null in place of a table name for a given
+     * mutation. Table names can only be alpha-numeric and underscores.
+     *
+     * @param tableName
+     *          the table to use when the tablename is null in the write call
+     */
+    OutputOptions<T> defaultTable(String tableName);
+
+    /**
+     * Enables the directive to create new tables, as necessary. Table names can only be
+     * alpha-numeric and underscores.
+     * <p>
+     * By default, this feature is <b>disabled</b>.
+     */
+    OutputOptions<T> createTables();
+
+    /**
+     * Enables the directive to use simulation mode for this job. In simulation mode, no output is
+     * produced. This is useful for testing.
+     * <p>
+     * By default, this feature is <b>disabled</b>.
+     */
+    OutputOptions<T> simulationMode();
+
+    /**
+     * Finish configuring, verify and serialize options into the Job or JobConf
+     */
+    void store(T j);
+  }
+
+}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/OutputInfo.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/OutputInfo.java
deleted file mode 100644
index 0ca744363e..0000000000
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/OutputInfo.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.hadoop.mapreduce;
-
-import java.nio.file.Path;
-import java.util.Optional;
-import java.util.Properties;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.ClientInfo;
-import org.apache.accumulo.hadoopImpl.mapreduce.OutputInfoImpl;
-import org.apache.hadoop.mapreduce.Job;
-
-/**
- * Object containing all the information needed for the Map Reduce job. This object is passed to
- * {@link AccumuloOutputFormat#setInfo(Job, OutputInfo)}. It uses a fluent API like so:
- *
- * <pre>
- * OutputInfo.builder().clientInfo(clientInfo).batchWriterOptions(bwConfig).build();
- * </pre>
- *
- * @since 2.0
- */
-public interface OutputInfo {
-  /**
-   * @return the client info set using OutputInfo.builder().clientInfo(info)
-   */
-  ClientInfo getClientInfo();
-
-  /**
-   * @return the BatchWriterConfig set using OutputInfo.builder()...batchWriterOptions(conf)
-   */
-  Optional<BatchWriterConfig> getBatchWriterOptions();
-
-  /**
-   * @return the default tame name set using OutputInfo.builder()...defaultTableName(name)
-   */
-  Optional<String> getDefaultTableName();
-
-  /**
-   * @return boolean if creating tables or not
-   */
-  boolean isCreateTables();
-
-  /**
-   * @return boolean if running simulation mode or not
-   */
-  boolean isSimulationMode();
-
-  /**
-   * @return builder for creating a {@link OutputInfo}
-   */
-  public static OutputInfoBuilder.ClientParams builder() {
-    return new OutputInfoImpl.OutputInfoBuilderImpl();
-  }
-
-  /**
-   * Fluent API builder for OutputInfo
-   *
-   * @since 2.0
-   */
-  interface OutputInfoBuilder {
-
-    /**
-     * Required params for client
-     *
-     * @since 2.0
-     */
-    interface ClientParams {
-      /**
-       * Set the connection information needed to communicate with Accumulo in this job. ClientInfo
-       * param can be created using {@link ClientInfo#from(Path)} or
-       * {@link ClientInfo#from(Properties)}
-       *
-       * @param clientInfo
-       *          Accumulo connection information
-       */
-      OutputOptions clientInfo(ClientInfo clientInfo);
-    }
-
-    /**
-     * Builder options
-     *
-     * @since 2.0
-     */
-    interface OutputOptions {
-      /**
-       * Sets the configuration for for the job's {@link BatchWriter} instances. If not set, a new
-       * {@link BatchWriterConfig}, with sensible built-in defaults is used. Setting the
-       * configuration multiple times overwrites any previous configuration.
-       *
-       * @param bwConfig
-       *          the configuration for the {@link BatchWriter}
-       */
-      OutputOptions batchWriterOptions(BatchWriterConfig bwConfig);
-
-      /**
-       * Sets the default table name to use if one emits a null in place of a table name for a given
-       * mutation. Table names can only be alpha-numeric and underscores.
-       *
-       * @param tableName
-       *          the table to use when the tablename is null in the write call
-       */
-      OutputOptions defaultTableName(String tableName);
-
-      /**
-       * Enables the directive to create new tables, as necessary. Table names can only be
-       * alpha-numeric and underscores.
-       * <p>
-       * By default, this feature is <b>disabled</b>.
-       */
-      OutputOptions enableCreateTables();
-
-      /**
-       * Enables the directive to use simulation mode for this job. In simulation mode, no output is
-       * produced. This is useful for testing.
-       * <p>
-       * By default, this feature is <b>disabled</b>.
-       */
-      OutputOptions enableSimulationMode();
-
-      /**
-       * @return newly created {@link OutputInfo}
-       */
-      OutputInfo build();
-    }
-  }
-}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloOutputFormatImpl.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloOutputFormatImpl.java
index 0f52588dcc..49890ff142 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloOutputFormatImpl.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloOutputFormatImpl.java
@@ -27,7 +27,6 @@
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.ClientInfo;
 import org.apache.accumulo.core.client.MultiTableBatchWriter;
 import org.apache.accumulo.core.client.MutationsRejectedException;
@@ -131,34 +130,6 @@ protected static String getDefaultTableName(JobConf job) {
     return OutputConfigurator.getDefaultTableName(CLASS, job);
   }
 
-  /**
-   * Sets the configuration for for the job's {@link BatchWriter} instances. If not set, a new
-   * {@link BatchWriterConfig}, with sensible built-in defaults is used. Setting the configuration
-   * multiple times overwrites any previous configuration.
-   *
-   * @param job
-   *          the Hadoop job instance to be configured
-   * @param bwConfig
-   *          the configuration for the {@link BatchWriter}
-   * @since 1.5.0
-   */
-  public static void setBatchWriterOptions(JobConf job, BatchWriterConfig bwConfig) {
-    OutputConfigurator.setBatchWriterOptions(CLASS, job, bwConfig);
-  }
-
-  /**
-   * Gets the {@link BatchWriterConfig} settings.
-   *
-   * @param job
-   *          the Hadoop context for the configured job
-   * @return the configuration object
-   * @since 1.5.0
-   * @see #setBatchWriterOptions(JobConf, BatchWriterConfig)
-   */
-  public static BatchWriterConfig getBatchWriterOptions(JobConf job) {
-    return OutputConfigurator.getBatchWriterOptions(CLASS, job);
-  }
-
   /**
    * Sets the directive to create new tables, as necessary. Table names can only be alpha-numeric
    * and underscores.
@@ -249,7 +220,7 @@ public AccumuloRecordWriter(JobConf job) throws AccumuloException, AccumuloSecur
 
       if (!simulate) {
         this.client = Accumulo.newClient().from(getClientInfo(job)).build();
-        mtbw = client.createMultiTableBatchWriter(getBatchWriterOptions(job));
+        mtbw = client.createMultiTableBatchWriter();
       }
     }
 
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloOutputFormatImpl.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloOutputFormatImpl.java
index c51efa6d89..696f7f34ee 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloOutputFormatImpl.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloOutputFormatImpl.java
@@ -27,7 +27,6 @@
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.ClientInfo;
 import org.apache.accumulo.core.client.MultiTableBatchWriter;
 import org.apache.accumulo.core.client.MutationsRejectedException;
@@ -133,34 +132,6 @@ protected static String getDefaultTableName(JobContext context) {
     return OutputConfigurator.getDefaultTableName(CLASS, context.getConfiguration());
   }
 
-  /**
-   * Sets the configuration for for the job's {@link BatchWriter} instances. If not set, a new
-   * {@link BatchWriterConfig}, with sensible built-in defaults is used. Setting the configuration
-   * multiple times overwrites any previous configuration.
-   *
-   * @param job
-   *          the Hadoop job instance to be configured
-   * @param bwConfig
-   *          the configuration for the {@link BatchWriter}
-   * @since 1.5.0
-   */
-  public static void setBatchWriterOptions(Job job, BatchWriterConfig bwConfig) {
-    OutputConfigurator.setBatchWriterOptions(CLASS, job.getConfiguration(), bwConfig);
-  }
-
-  /**
-   * Gets the {@link BatchWriterConfig} settings.
-   *
-   * @param context
-   *          the Hadoop context for the configured job
-   * @return the configuration object
-   * @since 1.5.0
-   * @see #setBatchWriterOptions(Job, BatchWriterConfig)
-   */
-  public static BatchWriterConfig getBatchWriterOptions(JobContext context) {
-    return OutputConfigurator.getBatchWriterOptions(CLASS, context.getConfiguration());
-  }
-
   /**
    * Sets the directive to create new tables, as necessary. Table names can only be alpha-numeric
    * and underscores.
@@ -252,7 +223,7 @@ public AccumuloRecordWriter(TaskAttemptContext context)
 
       if (!simulate) {
         this.client = Accumulo.newClient().from(getClientInfo(context)).build();
-        mtbw = client.createMultiTableBatchWriter(getBatchWriterOptions(context));
+        mtbw = client.createMultiTableBatchWriter();
       }
     }
 
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/FileOutputFormatBuilderImpl.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/FileOutputFormatBuilderImpl.java
new file mode 100644
index 0000000000..5e06e1fd2c
--- /dev/null
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/FileOutputFormatBuilderImpl.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoopImpl.mapreduce;
+
+import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setCompressionType;
+import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setDataBlockSize;
+import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setFileBlockSize;
+import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setIndexBlockSize;
+import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setReplication;
+import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setSampler;
+import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setSummarizers;
+import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.accumulo.hadoop.mapreduce.FileOutputFormatBuilder;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+
+public class FileOutputFormatBuilderImpl<T> implements FileOutputFormatBuilder,
+    FileOutputFormatBuilder.PathParams<T>, FileOutputFormatBuilder.OutputOptions<T> {
+
+  Path outputPath;
+  Optional<String> comp = Optional.empty();
+  Optional<Long> dataBlockSize = Optional.empty();
+  Optional<Long> fileBlockSize = Optional.empty();
+  Optional<Long> indexBlockSize = Optional.empty();
+  Optional<Integer> replication = Optional.empty();
+  Optional<SamplerConfiguration> sampler = Optional.empty();
+  Collection<SummarizerConfiguration> summarizers = Collections.emptySet();
+
+  @Override
+  public OutputOptions<T> outputPath(Path path) {
+    this.outputPath = Objects.requireNonNull(path);
+    return this;
+  }
+
+  @Override
+  public OutputOptions<T> compression(String compressionType) {
+    this.comp = Optional.of(compressionType);
+    return this;
+  }
+
+  @Override
+  public OutputOptions<T> dataBlockSize(long dataBlockSize) {
+    this.dataBlockSize = Optional.of(dataBlockSize);
+    return this;
+  }
+
+  @Override
+  public OutputOptions<T> fileBlockSize(long fileBlockSize) {
+    this.fileBlockSize = Optional.of(fileBlockSize);
+    return this;
+  }
+
+  @Override
+  public OutputOptions<T> indexBlockSize(long indexBlockSize) {
+    this.indexBlockSize = Optional.of(indexBlockSize);
+    return this;
+  }
+
+  @Override
+  public OutputOptions<T> replication(int replication) {
+    this.replication = Optional.of(replication);
+    return this;
+  }
+
+  @Override
+  public OutputOptions<T> sampler(SamplerConfiguration samplerConfig) {
+    this.sampler = Optional.of(samplerConfig);
+    return this;
+  }
+
+  @Override
+  public OutputOptions<T> summarizers(SummarizerConfiguration... summarizerConfigs) {
+    this.summarizers = Arrays.asList(Objects.requireNonNull(summarizerConfigs));
+    return this;
+  }
+
+  @Override
+  public void store(T j) {
+    if (j instanceof Job) {
+      store((Job) j);
+    } else if (j instanceof JobConf) {
+      store((JobConf) j);
+    } else {
+      throw new IllegalArgumentException("Unexpected type " + j.getClass().getName());
+    }
+  }
+
+  private void store(Job job) {
+    setOutputPath(job, outputPath);
+    if (comp.isPresent())
+      setCompressionType(job, comp.get());
+    if (dataBlockSize.isPresent())
+      setDataBlockSize(job, dataBlockSize.get());
+    if (fileBlockSize.isPresent())
+      setFileBlockSize(job, fileBlockSize.get());
+    if (indexBlockSize.isPresent())
+      setIndexBlockSize(job, indexBlockSize.get());
+    if (replication.isPresent())
+      setReplication(job, replication.get());
+    if (sampler.isPresent())
+      setSampler(job, sampler.get());
+    if (summarizers.size() > 0)
+      setSummarizers(job, summarizers.toArray(new SummarizerConfiguration[0]));
+  }
+
+  private void store(JobConf job) {
+    org.apache.hadoop.mapred.FileOutputFormat.setOutputPath(job, outputPath);
+    if (comp.isPresent())
+      org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setCompressionType(job,
+          comp.get());
+    if (dataBlockSize.isPresent())
+      org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setDataBlockSize(job,
+          dataBlockSize.get());
+    if (fileBlockSize.isPresent())
+      org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setFileBlockSize(job,
+          fileBlockSize.get());
+    if (indexBlockSize.isPresent())
+      org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setIndexBlockSize(job,
+          indexBlockSize.get());
+    if (replication.isPresent())
+      org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setReplication(job,
+          replication.get());
+    if (sampler.isPresent())
+      org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setSampler(job,
+          sampler.get());
+    if (summarizers.size() > 0)
+      org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setSummarizers(job,
+          summarizers.toArray(new SummarizerConfiguration[0]));
+  }
+
+}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/FileOutputInfoImpl.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/FileOutputInfoImpl.java
deleted file mode 100644
index 64d300e0b2..0000000000
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/FileOutputInfoImpl.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.hadoopImpl.mapreduce;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Objects;
-import java.util.Optional;
-
-import org.apache.accumulo.core.client.sample.SamplerConfiguration;
-import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
-import org.apache.accumulo.hadoop.mapreduce.FileOutputInfo;
-import org.apache.hadoop.fs.Path;
-
-public class FileOutputInfoImpl implements FileOutputInfo {
-  Path outputPath;
-  Optional<String> comp;
-  Optional<Long> dataBlockSize;
-  Optional<Long> fileBlockSize;
-  Optional<Long> indexBlockSize;
-  Optional<Integer> replication;
-  Optional<SamplerConfiguration> sampler;
-  Collection<SummarizerConfiguration> summarizers;
-
-  public FileOutputInfoImpl(Path outputPath, Optional<String> comp, Optional<Long> dataBlockSize,
-      Optional<Long> fileBlockSize, Optional<Long> indexBlockSize, Optional<Integer> replication,
-      Optional<SamplerConfiguration> sampler, Collection<SummarizerConfiguration> summarizers) {
-    this.outputPath = outputPath;
-    this.comp = comp;
-    this.dataBlockSize = dataBlockSize;
-    this.fileBlockSize = fileBlockSize;
-    this.indexBlockSize = indexBlockSize;
-    this.replication = replication;
-    this.sampler = sampler;
-    this.summarizers = summarizers;
-  }
-
-  @Override
-  public Path getOutputPath() {
-    return outputPath;
-  }
-
-  @Override
-  public Optional<String> getCompressionType() {
-    return comp;
-  }
-
-  @Override
-  public Optional<Long> getDataBlockSize() {
-    return dataBlockSize;
-  }
-
-  @Override
-  public Optional<Long> getFileBlockSize() {
-    return fileBlockSize;
-  }
-
-  @Override
-  public Optional<Long> getIndexBlockSize() {
-    return indexBlockSize;
-  }
-
-  @Override
-  public Optional<Integer> getReplication() {
-    return replication;
-  }
-
-  @Override
-  public Optional<SamplerConfiguration> getSampler() {
-    return sampler;
-  }
-
-  @Override
-  public Collection<SummarizerConfiguration> getSummarizers() {
-    return summarizers;
-  }
-
-  public static class FileOutputInfoBuilderImpl implements FileOutputInfoBuilder,
-      FileOutputInfoBuilder.PathParams, FileOutputInfoBuilder.OutputOptions {
-    Path outputPath;
-    Optional<String> comp = Optional.empty();
-    Optional<Long> dataBlockSize = Optional.empty();
-    Optional<Long> fileBlockSize = Optional.empty();
-    Optional<Long> indexBlockSize = Optional.empty();
-    Optional<Integer> replication = Optional.empty();
-    Optional<SamplerConfiguration> sampler = Optional.empty();
-    Collection<SummarizerConfiguration> summarizers = Collections.emptySet();
-
-    @Override
-    public OutputOptions outputPath(Path path) {
-      this.outputPath = Objects.requireNonNull(path);
-      ;
-      return this;
-    }
-
-    @Override
-    public OutputOptions compressionType(String compressionType) {
-      this.comp = Optional.of(compressionType);
-      return this;
-    }
-
-    @Override
-    public OutputOptions dataBlockSize(long dataBlockSize) {
-      this.dataBlockSize = Optional.of(dataBlockSize);
-      return this;
-    }
-
-    @Override
-    public OutputOptions fileBlockSize(long fileBlockSize) {
-      this.fileBlockSize = Optional.of(fileBlockSize);
-      return this;
-    }
-
-    @Override
-    public OutputOptions indexBlockSize(long indexBlockSize) {
-      this.indexBlockSize = Optional.of(indexBlockSize);
-      return this;
-    }
-
-    @Override
-    public OutputOptions replication(int replication) {
-      this.replication = Optional.of(replication);
-      return this;
-    }
-
-    @Override
-    public OutputOptions sampler(SamplerConfiguration samplerConfig) {
-      this.sampler = Optional.of(samplerConfig);
-      return this;
-    }
-
-    @Override
-    public OutputOptions summarizers(SummarizerConfiguration... summarizerConfigs) {
-      this.summarizers = Arrays.asList(Objects.requireNonNull(summarizerConfigs));
-      return this;
-    }
-
-    @Override
-    public FileOutputInfo build() {
-      return new FileOutputInfoImpl(outputPath, comp, dataBlockSize, fileBlockSize, indexBlockSize,
-          replication, sampler, summarizers);
-    }
-  }
-}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputFormatBuilderImpl.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputFormatBuilderImpl.java
new file mode 100644
index 0000000000..79deca692a
--- /dev/null
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputFormatBuilderImpl.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoopImpl.mapreduce;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.apache.accumulo.core.client.ClientInfo;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.hadoop.mapreduce.InputFormatBuilder;
+import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+public class InputFormatBuilderImpl<T> implements InputFormatBuilder,
+    InputFormatBuilder.ClientParams<T>, InputFormatBuilder.TableParams<T>,
+    InputFormatBuilder.AuthsParams<T>, InputFormatBuilder.InputFormatOptions<T>,
+    InputFormatBuilder.ScanOptions<T>, InputFormatBuilder.BatchScanOptions<T> {
+
+  Class<?> callingClass;
+  String tableName;
+  ClientInfo clientInfo;
+  Authorizations scanAuths;
+
+  Optional<String> context = Optional.empty();
+  Collection<Range> ranges = Collections.emptyList();
+  Collection<IteratorSetting.Column> fetchColumns = Collections.emptyList();
+  Map<String,IteratorSetting> iterators = Collections.emptyMap();
+  Optional<SamplerConfiguration> samplerConfig = Optional.empty();
+  Map<String,String> hints = Collections.emptyMap();
+  BuilderBooleans bools = new BuilderBooleans();
+
+  public InputFormatBuilderImpl(Class<?> callingClass) {
+    this.callingClass = callingClass;
+  }
+
+  @Override
+  public InputFormatBuilder.TableParams<T> clientInfo(ClientInfo clientInfo) {
+    this.clientInfo = Objects.requireNonNull(clientInfo, "ClientInfo must not be null");
+    return this;
+  }
+
+  @Override
+  public InputFormatBuilder.AuthsParams<T> table(String tableName) {
+    this.tableName = Objects.requireNonNull(tableName, "Table name must not be null");
+    return this;
+  }
+
+  @Override
+  public InputFormatBuilder.InputFormatOptions<T> auths(Authorizations auths) {
+    this.scanAuths = Objects.requireNonNull(auths, "Authorizations must not be null");
+    return this;
+  }
+
+  @Override
+  public InputFormatBuilder.InputFormatOptions<T> classLoaderContext(String context) {
+    this.context = Optional.of(context);
+    return this;
+  }
+
+  @Override
+  public InputFormatBuilder.InputFormatOptions<T> ranges(Collection<Range> ranges) {
+    this.ranges = ImmutableList
+        .copyOf(Objects.requireNonNull(ranges, "Collection of ranges is null"));
+    if (this.ranges.size() == 0)
+      throw new IllegalArgumentException("Specified collection of ranges is empty.");
+    return this;
+  }
+
+  @Override
+  public InputFormatBuilder.InputFormatOptions<T> fetchColumns(
+      Collection<IteratorSetting.Column> fetchColumns) {
+    this.fetchColumns = ImmutableList
+        .copyOf(Objects.requireNonNull(fetchColumns, "Collection of fetch columns is null"));
+    if (this.fetchColumns.size() == 0)
+      throw new IllegalArgumentException("Specified collection of fetch columns is empty.");
+    return this;
+  }
+
+  @Override
+  public InputFormatBuilder.InputFormatOptions<T> addIterator(IteratorSetting cfg) {
+    // store iterators by name to prevent duplicates
+    Objects.requireNonNull(cfg, "IteratorSetting must not be null.");
+    if (this.iterators.size() == 0)
+      this.iterators = new LinkedHashMap<>();
+    this.iterators.put(cfg.getName(), cfg);
+    return this;
+  }
+
+  @Override
+  public InputFormatBuilder.InputFormatOptions<T> executionHints(Map<String,String> hints) {
+    this.hints = ImmutableMap
+        .copyOf(Objects.requireNonNull(hints, "Map of execution hints must not be null."));
+    if (hints.size() == 0)
+      throw new IllegalArgumentException("Specified map of execution hints is empty.");
+    return this;
+  }
+
+  @Override
+  public InputFormatBuilder.InputFormatOptions<T> samplerConfiguration(
+      SamplerConfiguration samplerConfig) {
+    this.samplerConfig = Optional.of(samplerConfig);
+    return this;
+  }
+
+  @Override
+  public InputFormatOptions<T> disableAutoAdjustRanges() {
+    bools.autoAdjustRanges = false;
+    return this;
+  }
+
+  @Override
+  public ScanOptions<T> scanIsolation() {
+    bools.scanIsolation = true;
+    return this;
+  }
+
+  @Override
+  public ScanOptions<T> localIterators() {
+    bools.localIters = true;
+    return this;
+  }
+
+  @Override
+  public ScanOptions<T> offlineScan() {
+    bools.offlineScan = true;
+    return this;
+  }
+
+  @Override
+  public BatchScanOptions<T> batchScan() {
+    bools.batchScan = true;
+    bools.autoAdjustRanges = true;
+    return this;
+  }
+
+  @Override
+  public void store(T j) {
+    if (j instanceof Job) {
+      store((Job) j);
+    } else if (j instanceof JobConf) {
+      store((JobConf) j);
+    } else {
+      throw new IllegalArgumentException("Unexpected type " + j.getClass().getName());
+    }
+  }
+
+  /**
+   * Final builder method for mapreduce configuration
+   */
+  private void store(Job job) {
+    // TODO validate params are set correctly, possibly call/modify
+    // AbstractInputFormat.validateOptions()
+    AbstractInputFormat.setClientInfo(job, clientInfo);
+    AbstractInputFormat.setScanAuthorizations(job, scanAuths);
+    InputFormatBase.setInputTableName(job, tableName);
+
+    // all optional values
+    if (context.isPresent())
+      AbstractInputFormat.setClassLoaderContext(job, context.get());
+    if (ranges.size() > 0)
+      InputFormatBase.setRanges(job, ranges);
+    if (iterators.size() > 0)
+      InputConfigurator.writeIteratorsToConf(callingClass, job.getConfiguration(),
+          iterators.values());
+    if (fetchColumns.size() > 0)
+      InputConfigurator.fetchColumns(callingClass, job.getConfiguration(), fetchColumns);
+    if (samplerConfig.isPresent())
+      InputFormatBase.setSamplerConfiguration(job, samplerConfig.get());
+    if (hints.size() > 0)
+      InputFormatBase.setExecutionHints(job, hints);
+    InputFormatBase.setAutoAdjustRanges(job, bools.autoAdjustRanges);
+    InputFormatBase.setScanIsolation(job, bools.scanIsolation);
+    InputFormatBase.setLocalIterators(job, bools.localIters);
+    InputFormatBase.setOfflineTableScan(job, bools.offlineScan);
+    InputFormatBase.setBatchScan(job, bools.batchScan);
+  }
+
+  /**
+   * Final builder method for legacy mapred configuration
+   */
+  private void store(JobConf jobConf) {
+    // TODO validate params are set correctly, possibly call/modify
+    // AbstractInputFormat.validateOptions()
+    org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setClientInfo(jobConf, clientInfo);
+    org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setScanAuthorizations(jobConf,
+        scanAuths);
+    org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setInputTableName(jobConf, tableName);
+
+    // all optional values
+    if (context.isPresent())
+      org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setClassLoaderContext(jobConf,
+          context.get());
+    if (ranges.size() > 0)
+      org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setRanges(jobConf, ranges);
+    if (iterators.size() > 0)
+      InputConfigurator.writeIteratorsToConf(callingClass, jobConf, iterators.values());
+    if (fetchColumns.size() > 0)
+      InputConfigurator.fetchColumns(callingClass, jobConf, fetchColumns);
+    if (samplerConfig.isPresent())
+      org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setSamplerConfiguration(jobConf,
+          samplerConfig.get());
+    if (hints.size() > 0)
+      org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setExecutionHints(jobConf, hints);
+    org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setAutoAdjustRanges(jobConf,
+        bools.autoAdjustRanges);
+    org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setScanIsolation(jobConf,
+        bools.scanIsolation);
+    org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setLocalIterators(jobConf,
+        bools.localIters);
+    org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setOfflineTableScan(jobConf,
+        bools.offlineScan);
+    org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setBatchScan(jobConf, bools.batchScan);
+  }
+
+  private static class BuilderBooleans {
+    boolean autoAdjustRanges = true;
+    boolean scanIsolation = false;
+    boolean offlineScan = false;
+    boolean localIters = false;
+    boolean batchScan = false;
+  }
+}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputInfoImpl.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputInfoImpl.java
deleted file mode 100644
index 878148abe2..0000000000
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputInfoImpl.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.hadoopImpl.mapreduce;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-
-import org.apache.accumulo.core.client.ClientInfo;
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.sample.SamplerConfiguration;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.hadoop.mapreduce.InputInfo;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-
-public class InputInfoImpl implements InputInfo {
-  String tableName;
-  ClientInfo clientInfo;
-  Authorizations scanAuths;
-
-  // optional values
-  Optional<String> context;
-  Collection<Range> ranges;
-  Collection<IteratorSetting.Column> fetchColumns;
-  Map<String,IteratorSetting> iterators;
-  Optional<SamplerConfiguration> samplerConfig;
-  Map<String,String> hints;
-  InputInfoBooleans bools;
-
-  public InputInfoImpl(String tableName, ClientInfo clientInfo, Authorizations scanAuths,
-      Optional<String> context, Collection<Range> ranges,
-      Collection<IteratorSetting.Column> fetchColumns, Map<String,IteratorSetting> iterators,
-      Optional<SamplerConfiguration> samplerConfig, Map<String,String> hints,
-      InputInfoBooleans bools) {
-    this.tableName = tableName;
-    this.clientInfo = clientInfo;
-    this.scanAuths = scanAuths;
-    this.context = context;
-    this.ranges = ranges;
-    this.fetchColumns = fetchColumns;
-    this.iterators = iterators;
-    this.samplerConfig = samplerConfig;
-    this.hints = hints;
-    this.bools = bools;
-  }
-
-  @Override
-  public String getTableName() {
-    return tableName;
-  }
-
-  @Override
-  public ClientInfo getClientInfo() {
-    return clientInfo;
-  }
-
-  public Authorizations getScanAuths() {
-    return scanAuths;
-  }
-
-  @Override
-  public Optional<String> getContext() {
-    return context;
-  }
-
-  @Override
-  public Collection<Range> getRanges() {
-    return ranges;
-  }
-
-  @Override
-  public Collection<IteratorSetting.Column> getFetchColumns() {
-    return fetchColumns;
-  }
-
-  @Override
-  public Collection<IteratorSetting> getIterators() {
-    return iterators.values();
-  }
-
-  @Override
-  public Optional<SamplerConfiguration> getSamplerConfig() {
-    return samplerConfig;
-  }
-
-  @Override
-  public Map<String,String> getExecutionHints() {
-    return hints;
-  }
-
-  @Override
-  public boolean isAutoAdjustRanges() {
-    return bools.autoAdjustRanges;
-  }
-
-  @Override
-  public boolean isScanIsolation() {
-    return bools.scanIsolation;
-  }
-
-  @Override
-  public boolean isLocalIterators() {
-    return bools.localIters;
-  }
-
-  @Override
-  public boolean isOfflineScan() {
-    return bools.offlineScan;
-  }
-
-  @Override
-  public boolean isBatchScan() {
-    return bools.batchScan;
-  }
-
-  private static class InputInfoBooleans {
-    boolean autoAdjustRanges = true;
-    boolean scanIsolation = false;
-    boolean offlineScan = false;
-    boolean localIters = false;
-    boolean batchScan = false;
-  }
-
-  public static class InputInfoBuilderImpl
-      implements InputInfoBuilder, InputInfoBuilder.ClientParams, InputInfoBuilder.TableParams,
-      InputInfoBuilder.AuthsParams, InputInfoBuilder.InputFormatOptions,
-      InputInfoBuilder.ScanOptions, InputInfoBuilder.BatchScanOptions {
-
-    String tableName;
-    ClientInfo clientInfo;
-    Authorizations scanAuths;
-
-    Optional<String> context = Optional.empty();
-    Collection<Range> ranges = Collections.emptyList();
-    Collection<IteratorSetting.Column> fetchColumns = Collections.emptyList();
-    Map<String,IteratorSetting> iterators = Collections.emptyMap();
-    Optional<SamplerConfiguration> samplerConfig = Optional.empty();
-    Map<String,String> hints = Collections.emptyMap();
-    InputInfoBooleans bools = new InputInfoBooleans();
-
-    @Override
-    public InputInfoBuilder.TableParams clientInfo(ClientInfo clientInfo) {
-      this.clientInfo = Objects.requireNonNull(clientInfo, "ClientInfo must not be null");
-      return this;
-    }
-
-    @Override
-    public InputInfoBuilder.AuthsParams table(String tableName) {
-      this.tableName = Objects.requireNonNull(tableName, "Table name must not be null");
-      return this;
-    }
-
-    @Override
-    public InputInfoBuilder.InputFormatOptions scanAuths(Authorizations auths) {
-      this.scanAuths = Objects.requireNonNull(auths, "Authorizations must not be null");
-      return this;
-    }
-
-    @Override
-    public InputInfoBuilder.InputFormatOptions classLoaderContext(String context) {
-      this.context = Optional.of(context);
-      return this;
-    }
-
-    @Override
-    public InputInfoBuilder.InputFormatOptions ranges(Collection<Range> ranges) {
-      this.ranges = ImmutableList
-          .copyOf(Objects.requireNonNull(ranges, "Collection of ranges is null"));
-      if (this.ranges.size() == 0)
-        throw new IllegalArgumentException("Specified collection of ranges is empty.");
-      return this;
-    }
-
-    @Override
-    public InputInfoBuilder.InputFormatOptions fetchColumns(
-        Collection<IteratorSetting.Column> fetchColumns) {
-      this.fetchColumns = ImmutableList
-          .copyOf(Objects.requireNonNull(fetchColumns, "Collection of fetch columns is null"));
-      if (this.fetchColumns.size() == 0)
-        throw new IllegalArgumentException("Specified collection of fetch columns is empty.");
-      return this;
-    }
-
-    @Override
-    public InputInfoBuilder.InputFormatOptions addIterator(IteratorSetting cfg) {
-      // store iterators by name to prevent duplicates
-      Objects.requireNonNull(cfg, "IteratorSetting must not be null.");
-      if (this.iterators.size() == 0)
-        this.iterators = new LinkedHashMap<>();
-      this.iterators.put(cfg.getName(), cfg);
-      return this;
-    }
-
-    @Override
-    public InputInfoBuilder.InputFormatOptions executionHints(Map<String,String> hints) {
-      this.hints = ImmutableMap
-          .copyOf(Objects.requireNonNull(hints, "Map of execution hints must not be null."));
-      if (hints.size() == 0)
-        throw new IllegalArgumentException("Specified map of execution hints is empty.");
-      return this;
-    }
-
-    @Override
-    public InputInfoBuilder.InputFormatOptions samplerConfiguration(
-        SamplerConfiguration samplerConfig) {
-      this.samplerConfig = Optional.of(samplerConfig);
-      return this;
-    }
-
-    @Override
-    public InputFormatOptions disableAutoAdjustRanges() {
-      bools.autoAdjustRanges = false;
-      return this;
-    }
-
-    @Override
-    public ScanOptions scanIsolation() {
-      bools.scanIsolation = true;
-      return this;
-    }
-
-    @Override
-    public ScanOptions localIterators() {
-      bools.localIters = true;
-      return this;
-    }
-
-    @Override
-    public ScanOptions offlineScan() {
-      bools.offlineScan = true;
-      return this;
-    }
-
-    @Override
-    public BatchScanOptions batchScan() {
-      bools.batchScan = true;
-      bools.autoAdjustRanges = true;
-      return this;
-    }
-
-    @Override
-    public InputInfo build() {
-      return new InputInfoImpl(tableName, clientInfo, scanAuths, context, ranges, fetchColumns,
-          iterators, samplerConfig, hints, bools);
-    }
-  }
-}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/OutputFormatBuilderImpl.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/OutputFormatBuilderImpl.java
new file mode 100644
index 0000000000..0e9a1ca422
--- /dev/null
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/OutputFormatBuilderImpl.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoopImpl.mapreduce;
+
+import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloOutputFormatImpl.setClientInfo;
+import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloOutputFormatImpl.setCreateTables;
+import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloOutputFormatImpl.setDefaultTableName;
+import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloOutputFormatImpl.setSimulationMode;
+
+import java.util.Objects;
+import java.util.Optional;
+
+import org.apache.accumulo.core.client.ClientInfo;
+import org.apache.accumulo.hadoop.mapreduce.OutputFormatBuilder;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+
+public class OutputFormatBuilderImpl<T>
+    implements OutputFormatBuilder.ClientParams<T>, OutputFormatBuilder.OutputOptions<T> {
+  ClientInfo clientInfo;
+
+  // optional values
+  Optional<String> defaultTableName = Optional.empty();
+  boolean createTables = false;
+  boolean simulationMode = false;
+
+  @Override
+  public OutputFormatBuilder.OutputOptions<T> clientInfo(ClientInfo clientInfo) {
+    this.clientInfo = Objects.requireNonNull(clientInfo, "ClientInfo must not be null");
+    return this;
+  }
+
+  @Override
+  public OutputFormatBuilder.OutputOptions<T> defaultTable(String tableName) {
+    this.defaultTableName = Optional.of(tableName);
+    return this;
+  }
+
+  @Override
+  public OutputFormatBuilder.OutputOptions<T> createTables() {
+    this.createTables = true;
+    return this;
+  }
+
+  @Override
+  public OutputFormatBuilder.OutputOptions<T> simulationMode() {
+    this.simulationMode = true;
+    return this;
+  }
+
+  @Override
+  public void store(T j) {
+    if (j instanceof Job) {
+      store((Job) j);
+    } else if (j instanceof JobConf) {
+      store((JobConf) j);
+    } else {
+      throw new IllegalArgumentException("Unexpected type " + j.getClass().getName());
+    }
+  }
+
+  private void store(Job job) {
+    setClientInfo(job, clientInfo);
+    if (defaultTableName.isPresent())
+      setDefaultTableName(job, defaultTableName.get());
+    setCreateTables(job, createTables);
+    setSimulationMode(job, simulationMode);
+  }
+
+  private void store(JobConf jobConf) {
+    org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl.setClientInfo(jobConf,
+        clientInfo);
+    if (defaultTableName.isPresent())
+      org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl.setDefaultTableName(jobConf,
+          defaultTableName.get());
+    org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl.setCreateTables(jobConf,
+        createTables);
+    org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl.setSimulationMode(jobConf,
+        simulationMode);
+
+  }
+
+}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/OutputInfoImpl.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/OutputInfoImpl.java
deleted file mode 100644
index 27c94d1b95..0000000000
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/OutputInfoImpl.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.hadoopImpl.mapreduce;
-
-import java.util.Objects;
-import java.util.Optional;
-
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.ClientInfo;
-import org.apache.accumulo.hadoop.mapreduce.OutputInfo;
-
-public class OutputInfoImpl implements OutputInfo {
-  ClientInfo clientInfo;
-
-  // optional values
-  Optional<String> defaultTableName;
-  Optional<BatchWriterConfig> bwConfig;
-  boolean createTables;
-  boolean simulationMode;
-
-  public OutputInfoImpl(ClientInfo ci, Optional<String> defaultTableName,
-      Optional<BatchWriterConfig> bwConfig, boolean createTables, boolean simulationMode) {
-    this.clientInfo = ci;
-    this.defaultTableName = defaultTableName;
-    this.bwConfig = bwConfig;
-    this.createTables = createTables;
-    this.simulationMode = simulationMode;
-  }
-
-  @Override
-  public ClientInfo getClientInfo() {
-    return clientInfo;
-  }
-
-  @Override
-  public Optional<BatchWriterConfig> getBatchWriterOptions() {
-    return bwConfig;
-  }
-
-  @Override
-  public Optional<String> getDefaultTableName() {
-    return defaultTableName;
-  }
-
-  @Override
-  public boolean isCreateTables() {
-    return createTables;
-  }
-
-  @Override
-  public boolean isSimulationMode() {
-    return simulationMode;
-  }
-
-  public static class OutputInfoBuilderImpl implements OutputInfoBuilder,
-      OutputInfoBuilder.ClientParams, OutputInfoBuilder.OutputOptions {
-    ClientInfo clientInfo;
-
-    // optional values
-    Optional<String> defaultTableName = Optional.empty();
-    Optional<BatchWriterConfig> bwConfig = Optional.empty();
-    boolean createTables = false;
-    boolean simulationMode = false;
-
-    @Override
-    public OutputOptions clientInfo(ClientInfo clientInfo) {
-      this.clientInfo = Objects.requireNonNull(clientInfo, "ClientInfo must not be null");
-      return this;
-    }
-
-    @Override
-    public OutputOptions batchWriterOptions(BatchWriterConfig bwConfig) {
-      this.bwConfig = Optional.of(bwConfig);
-      return this;
-    }
-
-    @Override
-    public OutputOptions defaultTableName(String tableName) {
-      this.defaultTableName = Optional.of(tableName);
-      return this;
-    }
-
-    @Override
-    public OutputOptions enableCreateTables() {
-      this.createTables = true;
-      return this;
-    }
-
-    @Override
-    public OutputOptions enableSimulationMode() {
-      this.simulationMode = true;
-      return this;
-    }
-
-    @Override
-    public OutputInfo build() {
-      return new OutputInfoImpl(clientInfo, defaultTableName, bwConfig, createTables,
-          simulationMode);
-    }
-  }
-}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnDefaultTable.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnDefaultTable.java
index 772edc1e77..6f4ff2847c 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnDefaultTable.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnDefaultTable.java
@@ -19,8 +19,6 @@
 import org.apache.accumulo.core.client.ClientInfo;
 import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
 import org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat;
-import org.apache.accumulo.hadoop.mapreduce.InputInfo;
-import org.apache.accumulo.hadoop.mapreduce.OutputInfo;
 import org.apache.hadoop.mapreduce.Job;
 
 import com.beust.jcommander.Parameter;
@@ -41,10 +39,9 @@ public String getTableName() {
   public void setAccumuloConfigs(Job job) {
     final String tableName = getTableName();
     final ClientInfo info = getClientInfo();
-    AccumuloInputFormat.setInfo(job,
-        InputInfo.builder().clientInfo(info).table(tableName).scanAuths(auths).build());
-    AccumuloOutputFormat.setInfo(job, OutputInfo.builder().clientInfo(info)
-        .defaultTableName(tableName).enableCreateTables().build());
+    AccumuloInputFormat.configure().clientInfo(info).table(tableName).auths(auths).store(job);
+    AccumuloOutputFormat.configure().clientInfo(info).defaultTable(tableName).createTables()
+        .store(job);
   }
 
 }
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnRequiredTable.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnRequiredTable.java
index e6c91db455..76efb964a8 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnRequiredTable.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnRequiredTable.java
@@ -19,8 +19,6 @@
 import org.apache.accumulo.core.client.ClientInfo;
 import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
 import org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat;
-import org.apache.accumulo.hadoop.mapreduce.InputInfo;
-import org.apache.accumulo.hadoop.mapreduce.OutputInfo;
 import org.apache.hadoop.mapreduce.Job;
 
 import com.beust.jcommander.Parameter;
@@ -34,10 +32,10 @@
   public void setAccumuloConfigs(Job job) {
     final String tableName = getTableName();
     final ClientInfo info = getClientInfo();
-    AccumuloInputFormat.setInfo(job,
-        InputInfo.builder().clientInfo(info).table(tableName).scanAuths(auths).build());
-    AccumuloOutputFormat.setInfo(job, OutputInfo.builder().clientInfo(info)
-        .defaultTableName(tableName).enableCreateTables().build());
+    System.out.println("MIKE here is dahhhhhhhhhhhhhhhhhhh PRINCIPAL= " + info.getPrincipal());
+    AccumuloInputFormat.configure().clientInfo(info).table(tableName).auths(auths).store(job);
+    AccumuloOutputFormat.configure().clientInfo(info).defaultTable(tableName).createTables()
+        .store(job);
   }
 
   public String getTableName() {
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/OutputConfigurator.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/OutputConfigurator.java
index 12ce5726dd..9f325d4723 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/OutputConfigurator.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/OutputConfigurator.java
@@ -16,16 +16,19 @@
  */
 package org.apache.accumulo.hadoopImpl.mapreduce.lib;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.accumulo.core.conf.ClientProperty.BATCH_WRITER_DURABILITY;
+import static org.apache.accumulo.core.conf.ClientProperty.BATCH_WRITER_MAX_LATENCY_SEC;
+import static org.apache.accumulo.core.conf.ClientProperty.BATCH_WRITER_MAX_MEMORY_BYTES;
+import static org.apache.accumulo.core.conf.ClientProperty.BATCH_WRITER_MAX_TIMEOUT_SEC;
+import static org.apache.accumulo.core.conf.ClientProperty.BATCH_WRITER_MAX_WRITE_THREADS;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ClientInfo;
+import org.apache.accumulo.core.clientImpl.DurabilityImpl;
 import org.apache.hadoop.conf.Configuration;
 
 /**
@@ -85,61 +88,30 @@ public static String getDefaultTableName(Class<?> implementingClass, Configurati
   }
 
   /**
-   * Sets the configuration for for the job's {@link BatchWriter} instances. If not set, a new
-   * {@link BatchWriterConfig}, with sensible built-in defaults is used. Setting the configuration
-   * multiple times overwrites any previous configuration.
-   *
-   * @param implementingClass
-   *          the class whose name will be used as a prefix for the property configuration key
-   * @param conf
-   *          the Hadoop configuration object to configure
-   * @param bwConfig
-   *          the configuration for the {@link BatchWriter}
-   * @since 1.6.0
-   */
-  public static void setBatchWriterOptions(Class<?> implementingClass, Configuration conf,
-      BatchWriterConfig bwConfig) {
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    String serialized;
-    try {
-      bwConfig.write(new DataOutputStream(baos));
-      serialized = new String(baos.toByteArray(), UTF_8);
-      baos.close();
-    } catch (IOException e) {
-      throw new IllegalArgumentException(
-          "unable to serialize " + BatchWriterConfig.class.getName());
-    }
-    conf.set(enumToConfKey(implementingClass, WriteOpts.BATCH_WRITER_CONFIG), serialized);
-  }
-
-  /**
-   * Gets the {@link BatchWriterConfig} settings.
-   *
-   * @param implementingClass
-   *          the class whose name will be used as a prefix for the property configuration key
-   * @param conf
-   *          the Hadoop configuration object to configure
-   * @return the configuration object
-   * @since 1.6.0
-   * @see #setBatchWriterOptions(Class, Configuration, BatchWriterConfig)
+   * Gets the {@link BatchWriterConfig} settings that were stored with ClientInfo
    */
   public static BatchWriterConfig getBatchWriterOptions(Class<?> implementingClass,
       Configuration conf) {
-    String serialized = conf.get(enumToConfKey(implementingClass, WriteOpts.BATCH_WRITER_CONFIG));
     BatchWriterConfig bwConfig = new BatchWriterConfig();
-    if (serialized == null || serialized.isEmpty()) {
-      return bwConfig;
-    } else {
-      try {
-        ByteArrayInputStream bais = new ByteArrayInputStream(serialized.getBytes(UTF_8));
-        bwConfig.readFields(new DataInputStream(bais));
-        bais.close();
-        return bwConfig;
-      } catch (IOException e) {
-        throw new IllegalArgumentException(
-            "unable to serialize " + BatchWriterConfig.class.getName());
-      }
-    }
+    ClientInfo info = getClientInfo(implementingClass, conf);
+    Properties props = info.getProperties();
+    String property = props.getProperty(BATCH_WRITER_DURABILITY.getKey());
+    if (property != null)
+      bwConfig.setDurability(DurabilityImpl.fromString(property));
+    property = props.getProperty(BATCH_WRITER_MAX_LATENCY_SEC.getKey());
+    if (property != null)
+      bwConfig.setMaxLatency(Long.parseLong(property), TimeUnit.MILLISECONDS);
+    property = props.getProperty(BATCH_WRITER_MAX_MEMORY_BYTES.getKey());
+    if (property != null)
+      bwConfig.setMaxMemory(Long.parseLong(property));
+    property = props.getProperty(BATCH_WRITER_MAX_TIMEOUT_SEC.getKey());
+    if (property != null)
+      bwConfig.setTimeout(Long.parseLong(property), TimeUnit.MILLISECONDS);
+    property = props.getProperty(BATCH_WRITER_MAX_WRITE_THREADS.getKey());
+    if (property != null)
+      bwConfig.setMaxWriteThreads(Integer.parseInt(property));
+
+    return bwConfig;
   }
 
   /**
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloFileOutputFormatIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloFileOutputFormatIT.java
new file mode 100644
index 0000000000..e7bd8f7263
--- /dev/null
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloFileOutputFormatIT.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.hadoop.its.mapred;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.sample.RowSampler;
+import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.clientImpl.mapreduce.lib.ConfiguratorBase;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.file.rfile.RFileOperations;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.hadoop.mapred.AccumuloFileOutputFormat;
+import org.apache.accumulo.hadoop.mapred.AccumuloInputFormat;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AccumuloFileOutputFormatIT extends AccumuloClusterHarness {
+  private static final Logger log = LoggerFactory.getLogger(AccumuloFileOutputFormatIT.class);
+  private static final int JOB_VISIBILITY_CACHE_SIZE = 3000;
+  private static final String PREFIX = AccumuloFileOutputFormatIT.class.getSimpleName();
+  private static final String BAD_TABLE = PREFIX + "_mapred_bad_table";
+  private static final String TEST_TABLE = PREFIX + "_mapred_test_table";
+  private static final String EMPTY_TABLE = PREFIX + "_mapred_empty_table";
+
+  private static AssertionError e1 = null;
+  private static AssertionError e2 = null;
+
+  private static final SamplerConfiguration SAMPLER_CONFIG = new SamplerConfiguration(
+      RowSampler.class.getName()).addOption("hasher", "murmur3_32").addOption("modulus", "3");
+
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder(
+      new File(System.getProperty("user.dir") + "/target"));
+
+  @Test
+  public void testEmptyWrite() throws Exception {
+    try (AccumuloClient c = getAccumuloClient()) {
+      c.tableOperations().create(EMPTY_TABLE);
+      handleWriteTests(false);
+    }
+  }
+
+  @Test
+  public void testRealWrite() throws Exception {
+    try (AccumuloClient c = getAccumuloClient()) {
+      c.tableOperations().create(TEST_TABLE);
+      BatchWriter bw = c.createBatchWriter(TEST_TABLE, new BatchWriterConfig());
+      Mutation m = new Mutation("Key");
+      m.put("", "", "");
+      bw.addMutation(m);
+      bw.close();
+      handleWriteTests(true);
+    }
+  }
+
+  private static class MRTester extends Configured implements Tool {
+    private static class BadKeyMapper implements Mapper<Key,Value,Key,Value> {
+
+      int index = 0;
+
+      @Override
+      public void map(Key key, Value value, OutputCollector<Key,Value> output, Reporter reporter)
+          throws IOException {
+        try {
+          try {
+            output.collect(key, value);
+            if (index == 2)
+              fail();
+          } catch (Exception e) {
+            log.error(e.toString(), e);
+            assertEquals(2, index);
+          }
+        } catch (AssertionError e) {
+          e1 = e;
+        }
+        index++;
+      }
+
+      @Override
+      public void configure(JobConf job) {}
+
+      @Override
+      public void close() throws IOException {
+        try {
+          assertEquals(2, index);
+        } catch (AssertionError e) {
+          e2 = e;
+        }
+      }
+
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+
+      if (args.length != 2) {
+        throw new IllegalArgumentException(
+            "Usage : " + MRTester.class.getName() + " <table> <outputfile>");
+      }
+
+      String table = args[0];
+
+      JobConf job = new JobConf(getConf());
+      job.setJarByClass(this.getClass());
+      ConfiguratorBase.setVisibilityCacheSize(job, JOB_VISIBILITY_CACHE_SIZE);
+
+      job.setInputFormat(AccumuloInputFormat.class);
+
+      AccumuloInputFormat.configure().clientInfo(getClientInfo()).table(table)
+          .auths(Authorizations.EMPTY).store(job);
+      AccumuloFileOutputFormat.configure().outputPath(new Path(args[1])).sampler(SAMPLER_CONFIG)
+          .store(job);
+
+      job.setMapperClass(BAD_TABLE.equals(table) ? BadKeyMapper.class : IdentityMapper.class);
+      job.setMapOutputKeyClass(Key.class);
+      job.setMapOutputValueClass(Value.class);
+      job.setOutputFormat(AccumuloFileOutputFormat.class);
+
+      job.setNumReduceTasks(0);
+
+      return JobClient.runJob(job).isSuccessful() ? 0 : 1;
+    }
+
+    public static void main(String[] args) throws Exception {
+      Configuration conf = new Configuration();
+      conf.set("mapreduce.framework.name", "local");
+      conf.set("mapreduce.cluster.local.dir",
+          new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+      assertEquals(0, ToolRunner.run(conf, new MRTester(), args));
+    }
+  }
+
+  private void handleWriteTests(boolean content) throws Exception {
+    File f = folder.newFile(testName.getMethodName());
+    if (f.delete()) {
+      log.debug("Deleted {}", f);
+    }
+    MRTester.main(new String[] {content ? TEST_TABLE : EMPTY_TABLE, f.getAbsolutePath()});
+
+    assertTrue(f.exists());
+    File[] files = f.listFiles(file -> file.getName().startsWith("part-m-"));
+    assertNotNull(files);
+    if (content) {
+      assertEquals(1, files.length);
+      assertTrue(files[0].exists());
+
+      Configuration conf = CachedConfiguration.getInstance();
+      DefaultConfiguration acuconf = DefaultConfiguration.getInstance();
+      FileSKVIterator sample = RFileOperations.getInstance().newReaderBuilder()
+          .forFile(files[0].toString(), FileSystem.getLocal(conf), conf)
+          .withTableConfiguration(acuconf).build()
+          .getSample(new SamplerConfigurationImpl(SAMPLER_CONFIG));
+      assertNotNull(sample);
+    } else {
+      assertEquals(0, files.length);
+    }
+  }
+
+  @Test
+  public void writeBadVisibility() throws Exception {
+    try (AccumuloClient c = getAccumuloClient()) {
+      c.tableOperations().create(BAD_TABLE);
+      BatchWriter bw = c.createBatchWriter(BAD_TABLE, new BatchWriterConfig());
+      Mutation m = new Mutation("r1");
+      m.put("cf1", "cq1", "A&B");
+      m.put("cf1", "cq1", "A&B");
+      m.put("cf1", "cq2", "A&");
+      bw.addMutation(m);
+      bw.close();
+      File f = folder.newFile(testName.getMethodName());
+      if (f.delete()) {
+        log.debug("Deleted {}", f);
+      }
+      MRTester.main(new String[] {BAD_TABLE, f.getAbsolutePath()});
+      assertNull(e1);
+      assertNull(e2);
+    }
+  }
+}
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloInputFormatIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloInputFormatIT.java
new file mode 100644
index 0000000000..083865f632
--- /dev/null
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloInputFormatIT.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoop.its.mapred;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.sample.RowSampler;
+import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.hadoop.mapred.AccumuloInputFormat;
+import org.apache.accumulo.hadoop.mapreduce.InputFormatBuilder;
+import org.apache.accumulo.hadoopImpl.mapred.RangeInputSplit;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class AccumuloInputFormatIT extends AccumuloClusterHarness {
+
+  @BeforeClass
+  public static void setupClass() {
+    System.setProperty("hadoop.tmp.dir", System.getProperty("user.dir") + "/target/hadoop-tmp");
+  }
+
+  private static AssertionError e1 = null;
+  private static int e1Count = 0;
+  private static AssertionError e2 = null;
+  private static int e2Count = 0;
+
+  private static class MRTester extends Configured implements Tool {
+    private static class TestMapper implements Mapper<Key,Value,Key,Value> {
+      Key key = null;
+      int count = 0;
+
+      @Override
+      public void map(Key k, Value v, OutputCollector<Key,Value> output, Reporter reporter)
+          throws IOException {
+        try {
+          if (key != null)
+            assertEquals(key.getRow().toString(), new String(v.get()));
+          assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
+          assertEquals(new String(v.get()), String.format("%09x", count));
+        } catch (AssertionError e) {
+          e1 = e;
+          e1Count++;
+        }
+        key = new Key(k);
+        count++;
+      }
+
+      @Override
+      public void configure(JobConf job) {}
+
+      @Override
+      public void close() throws IOException {
+        try {
+          assertEquals(100, count);
+        } catch (AssertionError e) {
+          e2 = e;
+          e2Count++;
+        }
+      }
+
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+
+      if (args.length != 1 && args.length != 3) {
+        throw new IllegalArgumentException(
+            "Usage : " + MRTester.class.getName() + " <table> [<batchScan> <scan sample>]");
+      }
+
+      String table = args[0];
+      Boolean batchScan = false;
+      boolean sample = false;
+      if (args.length == 3) {
+        batchScan = Boolean.parseBoolean(args[1]);
+        sample = Boolean.parseBoolean(args[2]);
+      }
+
+      JobConf job = new JobConf(getConf());
+      job.setJarByClass(this.getClass());
+
+      job.setInputFormat(AccumuloInputFormat.class);
+
+      InputFormatBuilder.InputFormatOptions<JobConf> opts = AccumuloInputFormat.configure()
+          .clientInfo(getClientInfo()).table(table).auths(Authorizations.EMPTY);
+      if (batchScan)
+        opts.batchScan();
+      if (sample) {
+        opts.samplerConfiguration(SAMPLER_CONFIG);
+      }
+      opts.store(job);
+
+      job.setMapperClass(TestMapper.class);
+      job.setMapOutputKeyClass(Key.class);
+      job.setMapOutputValueClass(Value.class);
+      job.setOutputFormat(NullOutputFormat.class);
+
+      job.setNumReduceTasks(0);
+
+      return JobClient.runJob(job).isSuccessful() ? 0 : 1;
+    }
+
+    public static void main(String... args) throws Exception {
+      Configuration conf = new Configuration();
+      conf.set("mapreduce.framework.name", "local");
+      conf.set("mapreduce.cluster.local.dir",
+          new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+      assertEquals(0, ToolRunner.run(conf, new MRTester(), args));
+    }
+  }
+
+  @Test
+  public void testMap() throws Exception {
+    String table = getUniqueNames(1)[0];
+    try (AccumuloClient c = getAccumuloClient()) {
+      c.tableOperations().create(table);
+      BatchWriter bw = c.createBatchWriter(table, new BatchWriterConfig());
+      for (int i = 0; i < 100; i++) {
+        Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+        m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
+        bw.addMutation(m);
+      }
+      bw.close();
+
+      e1 = null;
+      e2 = null;
+
+      MRTester.main(table);
+      assertNull(e1);
+      assertNull(e2);
+    }
+  }
+
+  private static final SamplerConfiguration SAMPLER_CONFIG = new SamplerConfiguration(
+      RowSampler.class.getName()).addOption("hasher", "murmur3_32").addOption("modulus", "3");
+
+  @Test
+  public void testSample() throws Exception {
+    final String TEST_TABLE_3 = getUniqueNames(1)[0];
+
+    try (AccumuloClient c = getAccumuloClient()) {
+      c.tableOperations().create(TEST_TABLE_3,
+          new NewTableConfiguration().enableSampling(SAMPLER_CONFIG));
+      BatchWriter bw = c.createBatchWriter(TEST_TABLE_3, new BatchWriterConfig());
+      for (int i = 0; i < 100; i++) {
+        Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+        m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
+        bw.addMutation(m);
+      }
+      bw.close();
+
+      MRTester.main(TEST_TABLE_3, "False", "True");
+      assertEquals(38, e1Count);
+      assertEquals(1, e2Count);
+
+      e2Count = e1Count = 0;
+      MRTester.main(TEST_TABLE_3, "False", "False");
+      assertEquals(0, e1Count);
+      assertEquals(0, e2Count);
+
+      e2Count = e1Count = 0;
+      MRTester.main(TEST_TABLE_3, "True", "True");
+      assertEquals(38, e1Count);
+      assertEquals(1, e2Count);
+    }
+  }
+
+  @Test
+  public void testCorrectRangeInputSplits() throws Exception {
+    JobConf job = new JobConf();
+
+    String table = getUniqueNames(1)[0];
+    Authorizations auths = new Authorizations("foo");
+    Collection<IteratorSetting.Column> fetchColumns = Collections
+        .singleton(new IteratorSetting.Column(new Text("foo"), new Text("bar")));
+
+    try (AccumuloClient accumuloClient = getAccumuloClient()) {
+      accumuloClient.tableOperations().create(table);
+
+      AccumuloInputFormat.configure().clientInfo(getClientInfo()).table(table).auths(auths)
+          .fetchColumns(fetchColumns).scanIsolation().localIterators().store(job);
+
+      AccumuloInputFormat aif = new AccumuloInputFormat();
+
+      InputSplit[] splits = aif.getSplits(job, 1);
+
+      assertEquals(1, splits.length);
+
+      InputSplit split = splits[0];
+
+      assertEquals(RangeInputSplit.class, split.getClass());
+
+      RangeInputSplit risplit = (RangeInputSplit) split;
+
+      assertEquals(table, risplit.getTableName());
+      assertTrue(risplit.isIsolatedScan());
+      assertTrue(risplit.usesLocalIterators());
+      assertEquals(fetchColumns, risplit.getFetchedColumns());
+    }
+  }
+}
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloOutputFormatIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloOutputFormatIT.java
new file mode 100644
index 0000000000..973ebc3809
--- /dev/null
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloOutputFormatIT.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoop.its.mapred;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ClientInfo;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.hadoop.mapred.AccumuloInputFormat;
+import org.apache.accumulo.hadoop.mapred.AccumuloOutputFormat;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Test;
+
+public class AccumuloOutputFormatIT extends ConfigurableMacBase {
+
+  @Override
+  protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    cfg.setProperty(Property.TSERV_SESSION_MAXIDLE, "1");
+    cfg.setNumTservers(1);
+  }
+
+  // Prevent regression of ACCUMULO-3709.
+  @Test
+  public void testMapred() throws Exception {
+    try (AccumuloClient accumuloClient = getClient()) {
+      // create a table and put some data in it
+      accumuloClient.tableOperations().create(testName.getMethodName());
+
+      JobConf job = new JobConf();
+      BatchWriterConfig batchConfig = new BatchWriterConfig();
+      // no flushes!!!!!
+      batchConfig.setMaxLatency(0, TimeUnit.MILLISECONDS);
+      // use a single thread to ensure our update session times out
+      batchConfig.setMaxWriteThreads(1);
+      // set the max memory so that we ensure we don't flush on the write.
+      batchConfig.setMaxMemory(Long.MAX_VALUE);
+      AccumuloOutputFormat outputFormat = new AccumuloOutputFormat();
+      AccumuloOutputFormat.configure().clientInfo(getClientInfo(batchConfig)).store(job);
+      RecordWriter<Text,Mutation> writer = outputFormat.getRecordWriter(null, job, "Test", null);
+
+      try {
+        for (int i = 0; i < 3; i++) {
+          Mutation m = new Mutation(new Text(String.format("%08d", i)));
+          for (int j = 0; j < 3; j++) {
+            m.put(new Text("cf1"), new Text("cq" + j), new Value((i + "_" + j).getBytes(UTF_8)));
+          }
+          writer.write(new Text(testName.getMethodName()), m);
+        }
+
+      } catch (Exception e) {
+        e.printStackTrace();
+        // we don't want the exception to come from write
+      }
+
+      accumuloClient.securityOperations().revokeTablePermission("root", testName.getMethodName(),
+          TablePermission.WRITE);
+
+      try {
+        writer.close(null);
+        fail("Did not throw exception");
+      } catch (IOException ex) {
+        log.info(ex.getMessage(), ex);
+        assertTrue(ex.getCause() instanceof MutationsRejectedException);
+      }
+    }
+  }
+
+  private static AssertionError e1 = null;
+
+  private static class MRTester extends Configured implements Tool {
+    private static class TestMapper implements Mapper<Key,Value,Text,Mutation> {
+      Key key = null;
+      int count = 0;
+      OutputCollector<Text,Mutation> finalOutput;
+
+      @Override
+      public void map(Key k, Value v, OutputCollector<Text,Mutation> output, Reporter reporter) {
+        finalOutput = output;
+        try {
+          if (key != null)
+            assertEquals(key.getRow().toString(), new String(v.get()));
+          assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
+          assertEquals(new String(v.get()), String.format("%09x", count));
+        } catch (AssertionError e) {
+          e1 = e;
+        }
+        key = new Key(k);
+        count++;
+      }
+
+      @Override
+      public void configure(JobConf job) {}
+
+      @Override
+      public void close() throws IOException {
+        Mutation m = new Mutation("total");
+        m.put("", "", Integer.toString(count));
+        finalOutput.collect(new Text(), m);
+      }
+
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+
+      if (args.length != 6) {
+        throw new IllegalArgumentException("Usage : " + MRTester.class.getName()
+            + " <user> <pass> <inputtable> <outputtable> <instanceName> <zooKeepers>");
+      }
+
+      String user = args[0];
+      String pass = args[1];
+      String table1 = args[2];
+      String table2 = args[3];
+      String instanceName = args[4];
+      String zooKeepers = args[5];
+
+      JobConf job = new JobConf(getConf());
+      job.setJarByClass(this.getClass());
+
+      job.setInputFormat(AccumuloInputFormat.class);
+
+      ClientInfo info = Accumulo.newClient().to(instanceName, zooKeepers).as(user, pass).info();
+
+      AccumuloInputFormat.configure().clientInfo(info).table(table1).auths(Authorizations.EMPTY)
+          .store(job);
+
+      job.setMapperClass(TestMapper.class);
+      job.setMapOutputKeyClass(Key.class);
+      job.setMapOutputValueClass(Value.class);
+      job.setOutputFormat(AccumuloOutputFormat.class);
+      job.setOutputKeyClass(Text.class);
+      job.setOutputValueClass(Mutation.class);
+
+      AccumuloOutputFormat.configure().clientInfo(info).defaultTable(table2).store(job);
+
+      job.setNumReduceTasks(0);
+
+      return JobClient.runJob(job).isSuccessful() ? 0 : 1;
+    }
+
+    public static void main(String[] args) throws Exception {
+      Configuration conf = new Configuration();
+      conf.set("mapreduce.framework.name", "local");
+      conf.set("mapreduce.cluster.local.dir",
+          new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+      assertEquals(0, ToolRunner.run(conf, new MRTester(), args));
+    }
+  }
+
+  @Test
+  public void testMR() throws Exception {
+    try (AccumuloClient c = getClient()) {
+      String instanceName = getCluster().getInstanceName();
+      String table1 = instanceName + "_t1";
+      String table2 = instanceName + "_t2";
+      c.tableOperations().create(table1);
+      c.tableOperations().create(table2);
+      BatchWriter bw = c.createBatchWriter(table1, new BatchWriterConfig());
+      for (int i = 0; i < 100; i++) {
+        Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+        m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
+        bw.addMutation(m);
+      }
+      bw.close();
+
+      MRTester.main(new String[] {"root", ROOT_PASSWORD, table1, table2, instanceName,
+          getCluster().getZooKeepers()});
+      assertNull(e1);
+
+      try (Scanner scanner = c.createScanner(table2, new Authorizations())) {
+        Iterator<Entry<Key,Value>> iter = scanner.iterator();
+        assertTrue(iter.hasNext());
+        Entry<Key,Value> entry = iter.next();
+        assertEquals(Integer.parseInt(new String(entry.getValue().get())), 100);
+        assertFalse(iter.hasNext());
+      }
+    }
+  }
+
+}
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloRowInputFormatIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloRowInputFormatIT.java
index 2147554b34..c3854377c0 100644
--- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloRowInputFormatIT.java
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloRowInputFormatIT.java
@@ -39,7 +39,6 @@
 import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.accumulo.core.util.PeekingIterator;
 import org.apache.accumulo.hadoop.mapred.AccumuloRowInputFormat;
-import org.apache.accumulo.hadoop.mapreduce.InputInfo;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
@@ -166,8 +165,8 @@ public int run(String[] args) throws Exception {
 
       job.setInputFormat(AccumuloRowInputFormat.class);
 
-      AccumuloRowInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo())
-          .table(table).scanAuths(Authorizations.EMPTY).build());
+      AccumuloRowInputFormat.configure().clientInfo(getClientInfo()).table(table)
+          .auths(Authorizations.EMPTY).store(job);
 
       job.setMapperClass(TestMapper.class);
       job.setMapOutputKeyClass(Key.class);
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/TokenFileIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/TokenFileIT.java
new file mode 100644
index 0000000000..fb4e4c354b
--- /dev/null
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/TokenFileIT.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoop.its.mapred;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Paths;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ClientInfo;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.hadoop.mapred.AccumuloInputFormat;
+import org.apache.accumulo.hadoop.mapred.AccumuloOutputFormat;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+public class TokenFileIT extends AccumuloClusterHarness {
+  private static AssertionError e1 = null;
+
+  private static class MRTokenFileTester extends Configured implements Tool {
+    private static class TestMapper implements Mapper<Key,Value,Text,Mutation> {
+      Key key = null;
+      int count = 0;
+      OutputCollector<Text,Mutation> finalOutput;
+
+      @Override
+      public void map(Key k, Value v, OutputCollector<Text,Mutation> output, Reporter reporter)
+          throws IOException {
+        finalOutput = output;
+        try {
+          if (key != null)
+            assertEquals(key.getRow().toString(), new String(v.get()));
+          assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
+          assertEquals(new String(v.get()), String.format("%09x", count));
+        } catch (AssertionError e) {
+          e1 = e;
+        }
+        key = new Key(k);
+        count++;
+      }
+
+      @Override
+      public void configure(JobConf job) {}
+
+      @Override
+      public void close() throws IOException {
+        Mutation m = new Mutation("total");
+        m.put("", "", Integer.toString(count));
+        finalOutput.collect(new Text(), m);
+      }
+
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public int run(String[] args) throws Exception {
+
+      if (args.length != 3) {
+        throw new IllegalArgumentException("Usage : " + MRTokenFileTester.class.getName()
+            + " <token file> <inputtable> <outputtable>");
+      }
+
+      String tokenFile = args[0];
+      ClientInfo ci = ClientInfo.from(Paths.get(tokenFile));
+      String table1 = args[1];
+      String table2 = args[2];
+
+      JobConf job = new JobConf(getConf());
+      job.setJarByClass(this.getClass());
+
+      job.setInputFormat(AccumuloInputFormat.class);
+
+      AccumuloInputFormat.configure().clientInfo(ci).table(table1).auths(Authorizations.EMPTY)
+          .store(job);
+
+      job.setMapperClass(TestMapper.class);
+      job.setMapOutputKeyClass(Key.class);
+      job.setMapOutputValueClass(Value.class);
+      job.setOutputFormat(AccumuloOutputFormat.class);
+      job.setOutputKeyClass(Text.class);
+      job.setOutputValueClass(Mutation.class);
+
+      AccumuloOutputFormat.configure().clientInfo(ci).defaultTable(table2).store(job);
+
+      job.setNumReduceTasks(0);
+
+      return JobClient.runJob(job).isSuccessful() ? 0 : 1;
+    }
+
+    @SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "path provided by test")
+    public static void main(String[] args) throws Exception {
+      Configuration conf = CachedConfiguration.getInstance();
+      conf.set("hadoop.tmp.dir", new File(args[0]).getParent());
+      conf.set("mapreduce.framework.name", "local");
+      conf.set("mapreduce.cluster.local.dir",
+          new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+      assertEquals(0, ToolRunner.run(conf, new MRTokenFileTester(), args));
+    }
+  }
+
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder(
+      new File(System.getProperty("user.dir") + "/target"));
+
+  @Test
+  public void testMR() throws Exception {
+    String[] tableNames = getUniqueNames(2);
+    String table1 = tableNames[0];
+    String table2 = tableNames[1];
+    try (AccumuloClient c = getAccumuloClient()) {
+      c.tableOperations().create(table1);
+      c.tableOperations().create(table2);
+      BatchWriter bw = c.createBatchWriter(table1, new BatchWriterConfig());
+      for (int i = 0; i < 100; i++) {
+        Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+        m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
+        bw.addMutation(m);
+      }
+      bw.close();
+
+      File tf = folder.newFile("client.properties");
+      try (PrintStream out = new PrintStream(tf)) {
+        getClientInfo().getProperties().store(out, "Credentials for " + getClass().getName());
+      }
+
+      MRTokenFileTester.main(new String[] {tf.getAbsolutePath(), table1, table2});
+      assertNull(e1);
+
+      try (Scanner scanner = c.createScanner(table2, new Authorizations())) {
+        Iterator<Entry<Key,Value>> iter = scanner.iterator();
+        assertTrue(iter.hasNext());
+        Entry<Key,Value> entry = iter.next();
+        assertEquals(Integer.parseInt(new String(entry.getValue().get())), 100);
+        assertFalse(iter.hasNext());
+      }
+    }
+  }
+}
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloFileOutputFormatIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloFileOutputFormatIT.java
new file mode 100644
index 0000000000..7c482a46ee
--- /dev/null
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloFileOutputFormatIT.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoop.its.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.sample.RowSampler;
+import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.file.rfile.RFileOperations;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.hadoop.mapreduce.AccumuloFileOutputFormat;
+import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
+public class AccumuloFileOutputFormatIT extends AccumuloClusterHarness {
+
+  private String PREFIX;
+  private String BAD_TABLE;
+  private String TEST_TABLE;
+  private String EMPTY_TABLE;
+
+  private static final SamplerConfiguration SAMPLER_CONFIG = new SamplerConfiguration(
+      RowSampler.class.getName()).addOption("hasher", "murmur3_32").addOption("modulus", "3");
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 4 * 60;
+  }
+
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder(
+      new File(System.getProperty("user.dir") + "/target"));
+
+  @Before
+  public void setup() throws Exception {
+    PREFIX = testName.getMethodName() + "_";
+    BAD_TABLE = PREFIX + "_mapreduce_bad_table";
+    TEST_TABLE = PREFIX + "_mapreduce_test_table";
+    EMPTY_TABLE = PREFIX + "_mapreduce_empty_table";
+
+    try (AccumuloClient c = getAccumuloClient()) {
+      c.tableOperations().create(EMPTY_TABLE);
+      c.tableOperations().create(TEST_TABLE);
+      c.tableOperations().create(BAD_TABLE);
+      BatchWriter bw = c.createBatchWriter(TEST_TABLE, new BatchWriterConfig());
+      Mutation m = new Mutation("Key");
+      m.put("", "", "");
+      bw.addMutation(m);
+      bw.close();
+      bw = c.createBatchWriter(BAD_TABLE, new BatchWriterConfig());
+      m = new Mutation("r1");
+      m.put("cf1", "cq1", "A&B");
+      m.put("cf1", "cq1", "A&B");
+      m.put("cf1", "cq2", "A&");
+      bw.addMutation(m);
+      bw.close();
+    }
+  }
+
+  @Test
+  public void testEmptyWrite() throws Exception {
+    handleWriteTests(false);
+  }
+
+  @Test
+  public void testRealWrite() throws Exception {
+    handleWriteTests(true);
+  }
+
+  private static class MRTester extends Configured implements Tool {
+    private static class BadKeyMapper extends Mapper<Key,Value,Key,Value> {
+      int index = 0;
+
+      @Override
+      protected void map(Key key, Value value, Context context)
+          throws IOException, InterruptedException {
+        String table = context.getConfiguration().get("MRTester_tableName");
+        assertNotNull(table);
+        try {
+          try {
+            context.write(key, value);
+            if (index == 2)
+              fail();
+          } catch (Exception e) {
+            assertEquals(2, index);
+          }
+        } catch (AssertionError e) {
+          assertionErrors.put(table + "_map", e);
+        }
+        index++;
+      }
+
+      @Override
+      protected void cleanup(Context context) throws IOException, InterruptedException {
+        String table = context.getConfiguration().get("MRTester_tableName");
+        assertNotNull(table);
+        try {
+          assertEquals(2, index);
+        } catch (AssertionError e) {
+          assertionErrors.put(table + "_cleanup", e);
+        }
+      }
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+
+      if (args.length != 2) {
+        throw new IllegalArgumentException(
+            "Usage : " + MRTester.class.getName() + " <table> <outputfile>");
+      }
+
+      String table = args[0];
+      assertionErrors.put(table + "_map", new AssertionError("Dummy_map"));
+      assertionErrors.put(table + "_cleanup", new AssertionError("Dummy_cleanup"));
+
+      Job job = Job.getInstance(getConf(),
+          this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
+      job.setJarByClass(this.getClass());
+
+      job.setInputFormatClass(AccumuloInputFormat.class);
+
+      AccumuloInputFormat.configure().clientInfo(getClientInfo()).table(table)
+          .auths(Authorizations.EMPTY).store(job);
+      AccumuloFileOutputFormat.configure().outputPath(new Path(args[1])).sampler(SAMPLER_CONFIG)
+          .store(job);
+
+      job.setMapperClass(
+          table.endsWith("_mapreduce_bad_table") ? BadKeyMapper.class : Mapper.class);
+      job.setMapOutputKeyClass(Key.class);
+      job.setMapOutputValueClass(Value.class);
+      job.setOutputFormatClass(AccumuloFileOutputFormat.class);
+      job.getConfiguration().set("MRTester_tableName", table);
+
+      job.setNumReduceTasks(0);
+
+      job.waitForCompletion(true);
+
+      return job.isSuccessful() ? 0 : 1;
+    }
+
+    public static void main(String[] args) throws Exception {
+      Configuration conf = new Configuration();
+      conf.set("mapreduce.framework.name", "local");
+      conf.set("mapreduce.cluster.local.dir",
+          new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+      assertEquals(0, ToolRunner.run(conf, new MRTester(), args));
+    }
+  }
+
+  private void handleWriteTests(boolean content) throws Exception {
+    File f = folder.newFile(testName.getMethodName());
+    assertTrue(f.delete());
+    MRTester.main(new String[] {content ? TEST_TABLE : EMPTY_TABLE, f.getAbsolutePath()});
+
+    assertTrue(f.exists());
+    File[] files = f.listFiles(file -> file.getName().startsWith("part-m-"));
+    assertNotNull(files);
+    if (content) {
+      assertEquals(1, files.length);
+      assertTrue(files[0].exists());
+
+      Configuration conf = CachedConfiguration.getInstance();
+      DefaultConfiguration acuconf = DefaultConfiguration.getInstance();
+      FileSKVIterator sample = RFileOperations.getInstance().newReaderBuilder()
+          .forFile(files[0].toString(), FileSystem.getLocal(conf), conf)
+          .withTableConfiguration(acuconf).build()
+          .getSample(new SamplerConfigurationImpl(SAMPLER_CONFIG));
+      assertNotNull(sample);
+    } else {
+      assertEquals(0, files.length);
+    }
+  }
+
+  // track errors in the map reduce job; jobs insert a dummy error for the map and cleanup tasks (to
+  // ensure test correctness),
+  // so error tests should check to see if there is at least one error (could be more depending on
+  // the test) rather than zero
+  private static Multimap<String,AssertionError> assertionErrors = ArrayListMultimap.create();
+
+  @Test
+  public void writeBadVisibility() throws Exception {
+    File f = folder.newFile(testName.getMethodName());
+    assertTrue(f.delete());
+    MRTester.main(new String[] {BAD_TABLE, f.getAbsolutePath()});
+    assertTrue(f.exists());
+    assertEquals(1, assertionErrors.get(BAD_TABLE + "_map").size());
+    assertEquals(1, assertionErrors.get(BAD_TABLE + "_cleanup").size());
+  }
+
+}
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/NewAccumuloInputFormatIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloInputFormatIT.java
similarity index 87%
rename from hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/NewAccumuloInputFormatIT.java
rename to hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloInputFormatIT.java
index a7755509cb..a80672677a 100644
--- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/NewAccumuloInputFormatIT.java
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloInputFormatIT.java
@@ -48,7 +48,7 @@
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
-import org.apache.accumulo.hadoop.mapreduce.InputInfo;
+import org.apache.accumulo.hadoop.mapreduce.InputFormatBuilder.InputFormatOptions;
 import org.apache.accumulo.hadoopImpl.mapreduce.BatchInputSplit;
 import org.apache.accumulo.hadoopImpl.mapreduce.RangeInputSplit;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
@@ -75,7 +75,7 @@
  *
  * @since 2.0
  */
-public class NewAccumuloInputFormatIT extends AccumuloClusterHarness {
+public class AccumuloInputFormatIT extends AccumuloClusterHarness {
 
   AccumuloInputFormat inputFormat;
 
@@ -106,8 +106,8 @@ public void testGetSplits() throws Exception {
     insertData(table, currentTimeMillis());
 
     Job job = Job.getInstance();
-    AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo()).table(table)
-        .scanAuths(Authorizations.EMPTY).scanIsolation().build());
+    AccumuloInputFormat.configure().clientInfo(getClientInfo()).table(table)
+        .auths(Authorizations.EMPTY).scanIsolation().store(job);
 
     // split table
     TreeSet<Text> splitsToAdd = new TreeSet<>();
@@ -126,14 +126,14 @@ public void testGetSplits() throws Exception {
     List<Range> ranges = new ArrayList<>();
     for (Text text : actualSplits)
       ranges.add(new Range(text));
-    AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo()).table(table)
-        .scanAuths(Authorizations.EMPTY).ranges(ranges).build());
+    AccumuloInputFormat.configure().clientInfo(getClientInfo()).table(table)
+        .auths(Authorizations.EMPTY).ranges(ranges).store(job);
     splits = inputFormat.getSplits(job);
     assertEquals(actualSplits.size(), splits.size());
 
     // offline mode
-    AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo()).table(table)
-        .scanAuths(Authorizations.EMPTY).offlineScan().build());
+    AccumuloInputFormat.configure().clientInfo(getClientInfo()).table(table)
+        .auths(Authorizations.EMPTY).offlineScan().store(job);
     try {
       inputFormat.getSplits(job);
       fail("An exception should have been thrown");
@@ -148,19 +148,19 @@ public void testGetSplits() throws Exception {
     for (int i = 0; i < 5; i++)
       // overlapping ranges
       ranges.add(new Range(String.format("%09d", i), String.format("%09d", i + 2)));
-    AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo()).table(table)
-        .scanAuths(Authorizations.EMPTY).ranges(ranges).offlineScan().build());
+    AccumuloInputFormat.configure().clientInfo(getClientInfo()).table(table)
+        .auths(Authorizations.EMPTY).ranges(ranges).offlineScan().store(job);
     splits = inputFormat.getSplits(job);
     assertEquals(2, splits.size());
 
-    AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo()).table(table)
-        .scanAuths(Authorizations.EMPTY).disableAutoAdjustRanges().offlineScan().build());
+    AccumuloInputFormat.configure().clientInfo(getClientInfo()).table(table)
+        .auths(Authorizations.EMPTY).disableAutoAdjustRanges().offlineScan().store(job);
     splits = inputFormat.getSplits(job);
     assertEquals(ranges.size(), splits.size());
 
     // BatchScan not available for offline scans
-    AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo()).table(table)
-        .scanAuths(Authorizations.EMPTY).batchScan().build());
+    AccumuloInputFormat.configure().clientInfo(getClientInfo()).table(table)
+        .auths(Authorizations.EMPTY).batchScan().store(job);
     try {
       inputFormat.getSplits(job);
       fail("An exception should have been thrown");
@@ -168,28 +168,28 @@ public void testGetSplits() throws Exception {
 
     // table online tests
     client.tableOperations().online(table, true);
-    AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo()).table(table)
-        .scanAuths(Authorizations.EMPTY).build());
+    AccumuloInputFormat.configure().clientInfo(getClientInfo()).table(table)
+        .auths(Authorizations.EMPTY).store(job);
     // test for resumption of success
     splits = inputFormat.getSplits(job);
     assertEquals(2, splits.size());
 
     // BatchScan not available with isolated iterators
-    AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo()).table(table)
-        .scanAuths(Authorizations.EMPTY).scanIsolation().build());
+    AccumuloInputFormat.configure().clientInfo(getClientInfo()).table(table)
+        .auths(Authorizations.EMPTY).scanIsolation().store(job);
 
     splits = inputFormat.getSplits(job);
     assertEquals(2, splits.size());
 
     // BatchScan not available with local iterators
-    AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo()).table(table)
-        .scanAuths(Authorizations.EMPTY).localIterators().build());
+    AccumuloInputFormat.configure().clientInfo(getClientInfo()).table(table)
+        .auths(Authorizations.EMPTY).localIterators().store(job);
 
     splits = inputFormat.getSplits(job);
     assertEquals(2, splits.size());
 
-    AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo()).table(table)
-        .scanAuths(Authorizations.EMPTY).batchScan().build());
+    AccumuloInputFormat.configure().clientInfo(getClientInfo()).table(table)
+        .auths(Authorizations.EMPTY).batchScan().store(job);
 
     // Check we are getting back correct type pf split
     splits = inputFormat.getSplits(job);
@@ -285,14 +285,14 @@ public int run(String[] args) throws Exception {
 
       job.setInputFormatClass(inputFormatClass);
 
-      InputInfo.InputInfoBuilder.InputFormatOptions opts = InputInfo.builder()
-          .clientInfo(getClientInfo()).table(table).scanAuths(Authorizations.EMPTY);
+      InputFormatOptions<Job> opts = AccumuloInputFormat.configure().clientInfo(getClientInfo())
+          .table(table).auths(Authorizations.EMPTY);
       if (sample)
         opts = opts.samplerConfiguration(SAMPLER_CONFIG);
       if (batchScan)
-        AccumuloInputFormat.setInfo(job, opts.batchScan().build());
+        opts.batchScan().store(job);
       else
-        AccumuloInputFormat.setInfo(job, opts.build());
+        opts.store(job);
 
       job.setMapperClass(TestMapper.class);
       job.setMapOutputKeyClass(Key.class);
@@ -406,10 +406,9 @@ public void testCorrectRangeInputSplits() throws Exception {
     AccumuloClient accumuloClient = getAccumuloClient();
     accumuloClient.tableOperations().create(table);
 
-    InputInfo.InputInfoBuilder.InputFormatOptions opts = InputInfo.builder()
-        .clientInfo(getClientInfo()).table(table).scanAuths(auths);
-    AccumuloInputFormat.setInfo(job,
-        opts.fetchColumns(fetchColumns).scanIsolation().localIterators().build());
+    InputFormatOptions<Job> opts = AccumuloInputFormat.configure().clientInfo(getClientInfo())
+        .table(table).auths(auths);
+    opts.fetchColumns(fetchColumns).scanIsolation().localIterators().store(job);
 
     AccumuloInputFormat aif = new AccumuloInputFormat();
 
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloOutputFormatIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloOutputFormatIT.java
new file mode 100644
index 0000000000..badebe9fbc
--- /dev/null
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloOutputFormatIT.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoop.its.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Test;
+
+public class AccumuloOutputFormatIT extends AccumuloClusterHarness {
+  private static AssertionError e1 = null;
+
+  private static class MRTester extends Configured implements Tool {
+    private static class TestMapper extends Mapper<Key,Value,Text,Mutation> {
+      Key key = null;
+      int count = 0;
+
+      @Override
+      protected void map(Key k, Value v, Context context) throws IOException, InterruptedException {
+        try {
+          if (key != null)
+            assertEquals(key.getRow().toString(), new String(v.get()));
+          assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
+          assertEquals(new String(v.get()), String.format("%09x", count));
+        } catch (AssertionError e) {
+          e1 = e;
+        }
+        key = new Key(k);
+        count++;
+      }
+
+      @Override
+      protected void cleanup(Context context) throws IOException, InterruptedException {
+        Mutation m = new Mutation("total");
+        m.put("", "", Integer.toString(count));
+        context.write(new Text(), m);
+      }
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+
+      if (args.length != 2) {
+        throw new IllegalArgumentException(
+            "Usage : " + MRTester.class.getName() + " <inputtable> <outputtable>");
+      }
+
+      String table1 = args[0];
+      String table2 = args[1];
+
+      Job job = Job.getInstance(getConf(),
+          this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
+      job.setJarByClass(this.getClass());
+
+      job.setInputFormatClass(AccumuloInputFormat.class);
+
+      AccumuloInputFormat.configure().clientInfo(getClientInfo()).table(table1)
+          .auths(Authorizations.EMPTY).store(job);
+
+      job.setMapperClass(TestMapper.class);
+      job.setMapOutputKeyClass(Key.class);
+      job.setMapOutputValueClass(Value.class);
+      job.setOutputFormatClass(AccumuloOutputFormat.class);
+      job.setOutputKeyClass(Text.class);
+      job.setOutputValueClass(Mutation.class);
+
+      AccumuloOutputFormat.configure().clientInfo(getClientInfo()).defaultTable(table2).store(job);
+
+      job.setNumReduceTasks(0);
+
+      job.waitForCompletion(true);
+
+      return job.isSuccessful() ? 0 : 1;
+    }
+
+    public static void main(String[] args) throws Exception {
+      Configuration conf = new Configuration();
+      conf.set("mapreduce.framework.name", "local");
+      conf.set("mapreduce.cluster.local.dir",
+          new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+      assertEquals(0, ToolRunner.run(conf, new MRTester(), args));
+    }
+  }
+
+  @Test
+  public void testMR() throws Exception {
+    String[] tableNames = getUniqueNames(2);
+    String table1 = tableNames[0];
+    String table2 = tableNames[1];
+    try (AccumuloClient c = getAccumuloClient()) {
+      c.tableOperations().create(table1);
+      c.tableOperations().create(table2);
+      BatchWriter bw = c.createBatchWriter(table1, new BatchWriterConfig());
+      for (int i = 0; i < 100; i++) {
+        Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+        m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
+        bw.addMutation(m);
+      }
+      bw.close();
+
+      MRTester.main(new String[] {table1, table2});
+      assertNull(e1);
+
+      try (Scanner scanner = c.createScanner(table2, new Authorizations())) {
+        Iterator<Entry<Key,Value>> iter = scanner.iterator();
+        assertTrue(iter.hasNext());
+        Entry<Key,Value> entry = iter.next();
+        assertEquals(Integer.parseInt(new String(entry.getValue().get())), 100);
+        assertFalse(iter.hasNext());
+      }
+    }
+  }
+}
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloRowInputFormatIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloRowInputFormatIT.java
index a125a7ced2..6a836cf809 100644
--- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloRowInputFormatIT.java
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloRowInputFormatIT.java
@@ -39,7 +39,6 @@
 import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.accumulo.core.util.PeekingIterator;
 import org.apache.accumulo.hadoop.mapreduce.AccumuloRowInputFormat;
-import org.apache.accumulo.hadoop.mapreduce.InputInfo;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
@@ -160,8 +159,8 @@ public int run(String[] args) throws Exception {
 
       job.setInputFormatClass(AccumuloRowInputFormat.class);
 
-      AccumuloRowInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo())
-          .table(table).scanAuths(Authorizations.EMPTY).build());
+      AccumuloRowInputFormat.configure().clientInfo(getClientInfo()).table(table)
+          .auths(Authorizations.EMPTY).store(job);
 
       job.setMapperClass(TestMapper.class);
       job.setMapOutputKeyClass(Key.class);
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/RowHashIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/RowHashIT.java
new file mode 100644
index 0000000000..75c3efbc5e
--- /dev/null
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/RowHashIT.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoop.its.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.hadoopImpl.mapreduce.lib.MapReduceClientOnRequiredTable;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Test;
+
+import com.beust.jcommander.Parameter;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+public class RowHashIT extends ConfigurableMacBase {
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 60;
+  }
+
+  public static final String hadoopTmpDirArg = "-Dhadoop.tmp.dir=" + System.getProperty("user.dir")
+      + "/target/hadoop-tmp";
+
+  static final String tablename = "mapredf";
+  static final String input_cf = "cf-HASHTYPE";
+  static final String input_cq = "cq-NOTHASHED";
+  static final String input_cfcq = input_cf + ":" + input_cq;
+  static final String output_cq = "cq-MD4BASE64";
+
+  @Test
+  public void test() throws Exception {
+    try (AccumuloClient client = getClient()) {
+      runTest(client, getCluster());
+    }
+  }
+
+  @SuppressFBWarnings(value = "WEAK_MESSAGE_DIGEST_MD5", justification = "md5 is okay for testing")
+  static void runTest(AccumuloClient c, MiniAccumuloClusterImpl cluster) throws AccumuloException,
+      AccumuloSecurityException, TableExistsException, TableNotFoundException,
+      MutationsRejectedException, IOException, InterruptedException, NoSuchAlgorithmException {
+    c.tableOperations().create(tablename);
+    BatchWriter bw = c.createBatchWriter(tablename, new BatchWriterConfig());
+    for (int i = 0; i < 10; i++) {
+      Mutation m = new Mutation("" + i);
+      m.put(input_cf, input_cq, "row" + i);
+      bw.addMutation(m);
+    }
+    bw.close();
+    Process hash = cluster.exec(RowHash.class, Collections.singletonList(hadoopTmpDirArg), "-i",
+        c.info().getInstanceName(), "-z", c.info().getZooKeepers(), "-u", "root", "-p",
+        ROOT_PASSWORD, "-t", tablename, "--column", input_cfcq);
+    assertEquals(0, hash.waitFor());
+
+    try (Scanner s = c.createScanner(tablename, Authorizations.EMPTY)) {
+      s.fetchColumn(new Text(input_cf), new Text(output_cq));
+      int i = 0;
+      for (Entry<Key,Value> entry : s) {
+        MessageDigest md = MessageDigest.getInstance("MD5");
+        byte[] check = Base64.getEncoder().encode(md.digest(("row" + i).getBytes()));
+        assertEquals(entry.getValue().toString(), new String(check));
+        i++;
+      }
+    }
+  }
+
+  public static class RowHash extends Configured implements Tool {
+    /**
+     * The Mapper class that given a row number, will generate the appropriate output line.
+     */
+    public class HashDataMapper extends Mapper<Key,Value,Text,Mutation> {
+      @Override
+      public void map(Key row, Value data, Context context)
+          throws IOException, InterruptedException {
+        Mutation m = new Mutation(row.getRow());
+        m.put(new Text("cf-HASHTYPE"), new Text("cq-MD5BASE64"),
+            new Value(Base64.getEncoder().encode(MD5Hash.digest(data.toString()).getDigest())));
+        context.write(null, m);
+        context.progress();
+      }
+
+      @Override
+      public void setup(Context job) {}
+    }
+
+    public class Opts extends MapReduceClientOnRequiredTable {
+      @Parameter(names = "--column", required = true)
+      String column;
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+      Job job = Job.getInstance(getConf());
+      job.setJobName(this.getClass().getName());
+      job.setJarByClass(this.getClass());
+      RowHash.Opts opts = new RowHash.Opts();
+      opts.parseArgs(RowHash.class.getName(), args);
+      job.setInputFormatClass(AccumuloInputFormat.class);
+      opts.setAccumuloConfigs(job);
+
+      String col = opts.column;
+      int idx = col.indexOf(":");
+      Text cf = new Text(idx < 0 ? col : col.substring(0, idx));
+      Text cq = idx < 0 ? null : new Text(col.substring(idx + 1));
+      if (cf.getLength() > 0)
+        AccumuloInputFormat.configure().clientInfo(opts.getClientInfo()).table(opts.getTableName())
+            .auths(Authorizations.EMPTY)
+            .fetchColumns(Collections.singleton(new IteratorSetting.Column(cf, cq))).store(job);
+
+      job.setMapperClass(RowHash.HashDataMapper.class);
+      job.setMapOutputKeyClass(Text.class);
+      job.setMapOutputValueClass(Mutation.class);
+
+      job.setNumReduceTasks(0);
+
+      job.setOutputFormatClass(AccumuloOutputFormat.class);
+      AccumuloOutputFormat.configure().clientInfo(opts.getClientInfo()).store(job);
+
+      job.waitForCompletion(true);
+      return job.isSuccessful() ? 0 : 1;
+    }
+
+    public static void main(String[] args) throws Exception {
+      ToolRunner.run(new Configuration(), new RowHash(), args);
+    }
+  }
+
+}
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/TokenFileIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/TokenFileIT.java
new file mode 100644
index 0000000000..ae29912d58
--- /dev/null
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/TokenFileIT.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoop.its.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Paths;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ClientInfo;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+public class TokenFileIT extends AccumuloClusterHarness {
+  private static AssertionError e1 = null;
+
+  private static class MRTokenFileTester extends Configured implements Tool {
+    private static class TestMapper extends Mapper<Key,Value,Text,Mutation> {
+      Key key = null;
+      int count = 0;
+
+      @Override
+      protected void map(Key k, Value v, Context context) throws IOException, InterruptedException {
+        try {
+          if (key != null)
+            assertEquals(key.getRow().toString(), new String(v.get()));
+          assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
+          assertEquals(new String(v.get()), String.format("%09x", count));
+        } catch (AssertionError e) {
+          e1 = e;
+        }
+        key = new Key(k);
+        count++;
+      }
+
+      @Override
+      protected void cleanup(Context context) throws IOException, InterruptedException {
+        Mutation m = new Mutation("total");
+        m.put("", "", Integer.toString(count));
+        context.write(new Text(), m);
+      }
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public int run(String[] args) throws Exception {
+
+      if (args.length != 3) {
+        throw new IllegalArgumentException("Usage : " + MRTokenFileTester.class.getName()
+            + " <token file> <inputtable> <outputtable>");
+      }
+
+      String tokenFile = args[0];
+      ClientInfo ci = ClientInfo.from(Paths.get(tokenFile));
+      String table1 = args[1];
+      String table2 = args[2];
+
+      Job job = Job.getInstance(getConf(),
+          this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
+      job.setJarByClass(this.getClass());
+
+      job.setInputFormatClass(AccumuloInputFormat.class);
+
+      AccumuloInputFormat.configure().clientInfo(ci).table(table1).auths(Authorizations.EMPTY)
+          .store(job);
+
+      job.setMapperClass(TestMapper.class);
+      job.setMapOutputKeyClass(Key.class);
+      job.setMapOutputValueClass(Value.class);
+      job.setOutputFormatClass(AccumuloOutputFormat.class);
+      job.setOutputKeyClass(Text.class);
+      job.setOutputValueClass(Mutation.class);
+
+      AccumuloOutputFormat.configure().clientInfo(ci).defaultTable(table2).store(job);
+
+      job.setNumReduceTasks(0);
+
+      job.waitForCompletion(true);
+
+      return job.isSuccessful() ? 0 : 1;
+    }
+
+    @SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "path provided by test")
+    public static void main(String[] args) throws Exception {
+      Configuration conf = CachedConfiguration.getInstance();
+      conf.set("hadoop.tmp.dir", new File(args[0]).getParent());
+      conf.set("mapreduce.framework.name", "local");
+      conf.set("mapreduce.cluster.local.dir",
+          new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+      assertEquals(0, ToolRunner.run(conf, new MRTokenFileTester(), args));
+    }
+  }
+
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder(
+      new File(System.getProperty("user.dir") + "/target"));
+
+  @Test
+  public void testMR() throws Exception {
+    String[] tableNames = getUniqueNames(2);
+    String table1 = tableNames[0];
+    String table2 = tableNames[1];
+    try (AccumuloClient c = getAccumuloClient()) {
+      c.tableOperations().create(table1);
+      c.tableOperations().create(table2);
+      BatchWriter bw = c.createBatchWriter(table1, new BatchWriterConfig());
+      for (int i = 0; i < 100; i++) {
+        Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+        m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
+        bw.addMutation(m);
+      }
+      bw.close();
+
+      File tf = folder.newFile("client.properties");
+      try (PrintStream out = new PrintStream(tf)) {
+        getClientInfo().getProperties().store(out, "Credentials for " + getClass().getName());
+      }
+
+      MRTokenFileTester.main(new String[] {tf.getAbsolutePath(), table1, table2});
+      assertNull(e1);
+
+      try (Scanner scanner = c.createScanner(table2, new Authorizations())) {
+        Iterator<Entry<Key,Value>> iter = scanner.iterator();
+        assertTrue(iter.hasNext());
+        Entry<Key,Value> entry = iter.next();
+        assertEquals(Integer.parseInt(new String(entry.getValue().get())), 100);
+        assertFalse(iter.hasNext());
+      }
+    }
+  }
+}
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloFileOutputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloFileOutputFormatTest.java
index 7e1db6620a..c170918199 100644
--- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloFileOutputFormatTest.java
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloFileOutputFormatTest.java
@@ -31,7 +31,6 @@
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
-import org.apache.accumulo.hadoop.mapreduce.FileOutputInfo;
 import org.apache.accumulo.hadoopImpl.mapreduce.lib.FileOutputConfigurator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
@@ -57,10 +56,9 @@ public void validateConfiguration() throws IOException, InterruptedException {
         .addOption(CountingSummarizer.MAX_COUNTERS_OPT, 256).build();
 
     JobConf job = new JobConf();
-    AccumuloFileOutputFormat.setInfo(job,
-        FileOutputInfo.builder().outputPath(new Path("somewhere")).replication(a).fileBlockSize(b)
-            .dataBlockSize(c).indexBlockSize(d).compressionType(e).sampler(samplerConfig)
-            .summarizers(sc1, sc2).build());
+    AccumuloFileOutputFormat.configure().outputPath(new Path("somewhere")).replication(a)
+        .fileBlockSize(b).dataBlockSize(c).indexBlockSize(d).compression(e).sampler(samplerConfig)
+        .summarizers(sc1, sc2).store(job);
 
     AccumuloConfiguration acuconf = FileOutputConfigurator
         .getAccumuloConfiguration(AccumuloFileOutputFormat.class, job);
@@ -89,9 +87,9 @@ public void validateConfiguration() throws IOException, InterruptedException {
     samplerConfig.addOption("modulus", "100003");
 
     job = new JobConf();
-    AccumuloFileOutputFormat.setInfo(job,
-        FileOutputInfo.builder().outputPath(new Path("somewhere")).replication(a).fileBlockSize(b)
-            .dataBlockSize(c).indexBlockSize(d).compressionType(e).sampler(samplerConfig).build());
+    AccumuloFileOutputFormat.configure().outputPath(new Path("somewhere")).replication(a)
+        .fileBlockSize(b).dataBlockSize(c).indexBlockSize(d).compression(e).sampler(samplerConfig)
+        .store(job);
 
     acuconf = FileOutputConfigurator.getAccumuloConfiguration(AccumuloFileOutputFormat.class, job);
 
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormatTest.java
index e204e8fd3e..6831869226 100644
--- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormatTest.java
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormatTest.java
@@ -38,7 +38,7 @@
 import org.apache.accumulo.core.iterators.user.VersioningIterator;
 import org.apache.accumulo.core.iterators.user.WholeRowIterator;
 import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.hadoop.mapreduce.InputInfo;
+import org.apache.accumulo.hadoop.mapreduce.InputFormatBuilder.InputFormatOptions;
 import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
@@ -77,11 +77,11 @@ public static void setupClientInfo() {
    */
   @Test
   public void testSetIterator() throws IOException {
-    InputInfo.InputInfoBuilder.InputFormatOptions opts = InputInfo.builder().clientInfo(clientInfo)
-        .table("test").scanAuths(Authorizations.EMPTY);
+    InputFormatOptions<JobConf> opts = AccumuloInputFormat.configure().clientInfo(clientInfo)
+        .table("test").auths(Authorizations.EMPTY);
 
     IteratorSetting is = new IteratorSetting(1, "WholeRow", WholeRowIterator.class);
-    AccumuloInputFormat.setInfo(job, opts.addIterator(is).build());
+    opts.addIterator(is).store(job);
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     is.write(new DataOutputStream(baos));
     String iterators = job.get("AccumuloInputFormat.ScanOpts.Iterators");
@@ -90,16 +90,15 @@ public void testSetIterator() throws IOException {
 
   @Test
   public void testAddIterator() {
-    InputInfo.InputInfoBuilder.InputFormatOptions opts = InputInfo.builder().clientInfo(clientInfo)
-        .table("test").scanAuths(Authorizations.EMPTY);
+    InputFormatOptions<JobConf> opts = AccumuloInputFormat.configure().clientInfo(clientInfo)
+        .table("test").auths(Authorizations.EMPTY);
 
     IteratorSetting iter1 = new IteratorSetting(1, "WholeRow", WholeRowIterator.class);
     IteratorSetting iter2 = new IteratorSetting(2, "Versions", VersioningIterator.class);
     IteratorSetting iter3 = new IteratorSetting(3, "Count", CountingIterator.class);
     iter3.addOption("v1", "1");
     iter3.addOption("junk", "\0omg:!\\xyzzy");
-    AccumuloInputFormat.setInfo(job,
-        opts.addIterator(iter1).addIterator(iter2).addIterator(iter3).build());
+    opts.addIterator(iter1).addIterator(iter2).addIterator(iter3).store(job);
 
     List<IteratorSetting> list = InputConfigurator.getIterators(AccumuloInputFormat.class, job);
 
@@ -143,9 +142,9 @@ public void testIteratorOptionEncoding() throws Throwable {
     IteratorSetting iter1 = new IteratorSetting(1, "iter1", WholeRowIterator.class);
     iter1.addOption(key, value);
     // also test if reusing options will create duplicate iterators
-    InputInfo.InputInfoBuilder.InputFormatOptions opts = InputInfo.builder().clientInfo(clientInfo)
-        .table("test").scanAuths(Authorizations.EMPTY);
-    AccumuloInputFormat.setInfo(job, opts.addIterator(iter1).build());
+    InputFormatOptions<JobConf> opts = AccumuloInputFormat.configure().clientInfo(clientInfo)
+        .table("test").auths(Authorizations.EMPTY);
+    opts.addIterator(iter1).store(job);
 
     List<IteratorSetting> list = InputConfigurator.getIterators(AccumuloInputFormat.class, job);
     assertEquals(1, list.size());
@@ -155,7 +154,7 @@ public void testIteratorOptionEncoding() throws Throwable {
     IteratorSetting iter2 = new IteratorSetting(1, "iter2", WholeRowIterator.class);
     iter2.addOption(key, value);
     iter2.addOption(key + "2", value);
-    AccumuloInputFormat.setInfo(job, opts.addIterator(iter1).addIterator(iter2).build());
+    opts.addIterator(iter1).addIterator(iter2).store(job);
     list = InputConfigurator.getIterators(AccumuloInputFormat.class, job);
     assertEquals(2, list.size());
     assertEquals(1, list.get(0).getOptions().size());
@@ -173,9 +172,8 @@ public void testGetIteratorSettings() {
     IteratorSetting iter1 = new IteratorSetting(1, "WholeRow", WholeRowIterator.class.getName());
     IteratorSetting iter2 = new IteratorSetting(2, "Versions", VersioningIterator.class.getName());
     IteratorSetting iter3 = new IteratorSetting(3, "Count", CountingIterator.class.getName());
-    AccumuloInputFormat.setInfo(job,
-        InputInfo.builder().clientInfo(clientInfo).table("test").scanAuths(Authorizations.EMPTY)
-            .addIterator(iter1).addIterator(iter2).addIterator(iter3).build());
+    AccumuloInputFormat.configure().clientInfo(clientInfo).table("test").auths(Authorizations.EMPTY)
+        .addIterator(iter1).addIterator(iter2).addIterator(iter3).store(job);
 
     List<IteratorSetting> list = InputConfigurator.getIterators(AccumuloInputFormat.class, job);
 
@@ -206,8 +204,8 @@ public void testSetRegex() {
 
     IteratorSetting is = new IteratorSetting(50, regex, RegExFilter.class);
     RegExFilter.setRegexs(is, regex, null, null, null, false);
-    AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(clientInfo).table("test")
-        .scanAuths(Authorizations.EMPTY).addIterator(is).build());
+    AccumuloInputFormat.configure().clientInfo(clientInfo).table("test").auths(Authorizations.EMPTY)
+        .addIterator(is).store(job);
 
     assertEquals(regex,
         InputConfigurator.getIterators(AccumuloInputFormat.class, job).get(0).getName());
@@ -221,8 +219,8 @@ public void testEmptyColumnFamily() throws IOException {
     cols.add(new IteratorSetting.Column(new Text(""), new Text("bar")));
     cols.add(new IteratorSetting.Column(new Text(""), new Text("")));
     cols.add(new IteratorSetting.Column(new Text("foo"), new Text("")));
-    AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(clientInfo).table("test")
-        .scanAuths(Authorizations.EMPTY).fetchColumns(cols).build());
+    AccumuloInputFormat.configure().clientInfo(clientInfo).table("test").auths(Authorizations.EMPTY)
+        .fetchColumns(cols).store(job);
 
     assertEquals(cols, InputConfigurator.getFetchedColumns(AccumuloInputFormat.class, job));
   }
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormatTest.java
index 5811ebcef6..0f9f77746a 100644
--- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormatTest.java
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormatTest.java
@@ -16,21 +16,16 @@
  */
 package org.apache.accumulo.hadoop.mapred;
 
-import static org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl.getBatchWriterOptions;
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.replay;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 
 import java.io.IOException;
-import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.ClientInfo;
-import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
-import org.apache.accumulo.hadoop.mapreduce.OutputInfo;
+import org.apache.accumulo.hadoopImpl.mapreduce.lib.OutputConfigurator;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.mapred.JobConf;
 import org.junit.Test;
@@ -38,14 +33,11 @@
 public class AccumuloOutputFormatTest {
   @Test
   public void testBWSettings() throws IOException {
-    ClientInfo clientInfo = createMock(ClientInfo.class);
-    AuthenticationToken token = createMock(AuthenticationToken.class);
-    Properties props = createMock(Properties.class);
-    expect(clientInfo.getAuthenticationToken()).andReturn(token).anyTimes();
-    expect(clientInfo.getProperties()).andReturn(props).anyTimes();
-    replay(clientInfo);
     JobConf job = new JobConf();
 
+    AccumuloClient.ConnectionOptions opts = Accumulo.newClient().to("test", "zk").as("blah",
+        "blah");
+
     // make sure we aren't testing defaults
     final BatchWriterConfig bwDefaults = new BatchWriterConfig();
     assertNotEquals(7654321L, bwDefaults.getMaxLatency(TimeUnit.MILLISECONDS));
@@ -58,13 +50,14 @@ public void testBWSettings() throws IOException {
     bwConfig.setTimeout(9898989L, TimeUnit.MILLISECONDS);
     bwConfig.setMaxWriteThreads(42);
     bwConfig.setMaxMemory(1123581321L);
-    AccumuloOutputFormat.setInfo(job,
-        OutputInfo.builder().clientInfo(clientInfo).batchWriterOptions(bwConfig).build());
+    opts.batchWriterConfig(bwConfig);
+    AccumuloOutputFormat.configure().clientInfo(opts.info()).store(job);
 
     AccumuloOutputFormat myAOF = new AccumuloOutputFormat() {
       @Override
       public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
-        BatchWriterConfig bwOpts = getBatchWriterOptions(job);
+        BatchWriterConfig bwOpts = OutputConfigurator
+            .getBatchWriterOptions(AccumuloOutputFormat.class, job);
 
         // passive check
         assertEquals(bwConfig.getMaxLatency(TimeUnit.MILLISECONDS),
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloFileOutputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloFileOutputFormatTest.java
index ea748265c5..7aacc1470c 100644
--- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloFileOutputFormatTest.java
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloFileOutputFormatTest.java
@@ -56,10 +56,9 @@ public void validateConfiguration() throws IOException {
         .addOption(CountingSummarizer.MAX_COUNTERS_OPT, 256).build();
 
     Job job1 = Job.getInstance();
-    AccumuloFileOutputFormat.setInfo(job1,
-        FileOutputInfo.builder().outputPath(new Path("somewhere")).replication(a).fileBlockSize(b)
-            .dataBlockSize(c).indexBlockSize(d).compressionType(e).sampler(samplerConfig)
-            .summarizers(sc1, sc2).build());
+    AccumuloFileOutputFormat.configure().outputPath(new Path("somewhere")).replication(a)
+        .fileBlockSize(b).dataBlockSize(c).indexBlockSize(d).compression(e).sampler(samplerConfig)
+        .summarizers(sc1, sc2).store(job1);
 
     AccumuloConfiguration acuconf = FileOutputConfigurator
         .getAccumuloConfiguration(AccumuloFileOutputFormat.class, job1.getConfiguration());
@@ -88,9 +87,9 @@ public void validateConfiguration() throws IOException {
     samplerConfig.addOption("modulus", "100003");
 
     Job job2 = Job.getInstance();
-    AccumuloFileOutputFormat.setInfo(job2,
-        FileOutputInfo.builder().outputPath(new Path("somewhere")).replication(a).fileBlockSize(b)
-            .dataBlockSize(c).indexBlockSize(d).compressionType(e).sampler(samplerConfig).build());
+    AccumuloFileOutputFormat.configure().outputPath(new Path("somewhere")).replication(a)
+        .fileBlockSize(b).dataBlockSize(c).indexBlockSize(d).compression(e).sampler(samplerConfig)
+        .store(job2);
 
     acuconf = FileOutputConfigurator.getAccumuloConfiguration(AccumuloFileOutputFormat.class,
         job2.getConfiguration());
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormatTest.java
index 4de275b1c0..dfac630162 100644
--- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormatTest.java
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormatTest.java
@@ -38,6 +38,7 @@
 import org.apache.accumulo.core.iterators.user.VersioningIterator;
 import org.apache.accumulo.core.iterators.user.WholeRowIterator;
 import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.hadoop.mapreduce.InputFormatBuilder.InputFormatOptions;
 import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
@@ -64,11 +65,10 @@ public static void setupClientInfo() {
   @Test
   public void testSetIterator() throws IOException {
     Job job = Job.getInstance();
-    InputInfo.InputInfoBuilder.InputFormatOptions opts = InputInfo.builder().clientInfo(clientInfo)
-        .table("test").scanAuths(Authorizations.EMPTY);
 
     IteratorSetting is = new IteratorSetting(1, "WholeRow", WholeRowIterator.class);
-    AccumuloInputFormat.setInfo(job, opts.addIterator(is).build());
+    AccumuloInputFormat.configure().clientInfo(clientInfo).table("test").auths(Authorizations.EMPTY)
+        .addIterator(is).store(job);
     Configuration conf = job.getConfiguration();
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     is.write(new DataOutputStream(baos));
@@ -79,16 +79,14 @@ public void testSetIterator() throws IOException {
   @Test
   public void testAddIterator() throws IOException {
     Job job = Job.getInstance();
-    InputInfo.InputInfoBuilder.InputFormatOptions opts = InputInfo.builder().clientInfo(clientInfo)
-        .table("test").scanAuths(Authorizations.EMPTY);
 
     IteratorSetting iter1 = new IteratorSetting(1, "WholeRow", WholeRowIterator.class);
     IteratorSetting iter2 = new IteratorSetting(2, "Versions", VersioningIterator.class);
     IteratorSetting iter3 = new IteratorSetting(3, "Count", CountingIterator.class);
     iter3.addOption("v1", "1");
     iter3.addOption("junk", "\0omg:!\\xyzzy");
-    AccumuloInputFormat.setInfo(job,
-        opts.addIterator(iter1).addIterator(iter2).addIterator(iter3).build());
+    AccumuloInputFormat.configure().clientInfo(clientInfo).table("test").auths(Authorizations.EMPTY)
+        .addIterator(iter1).addIterator(iter2).addIterator(iter3).store(job);
 
     List<IteratorSetting> list = InputConfigurator.getIterators(AccumuloInputFormat.class,
         job.getConfiguration());
@@ -134,9 +132,9 @@ public void testIteratorOptionEncoding() throws Throwable {
     iter1.addOption(key, value);
     Job job = Job.getInstance();
     // also test if reusing options will create duplicate iterators
-    InputInfo.InputInfoBuilder.InputFormatOptions opts = InputInfo.builder().clientInfo(clientInfo)
-        .table("test").scanAuths(Authorizations.EMPTY);
-    AccumuloInputFormat.setInfo(job, opts.addIterator(iter1).build());
+    InputFormatOptions<Job> opts = AccumuloInputFormat.configure().clientInfo(clientInfo)
+        .table("test").auths(Authorizations.EMPTY);
+    opts.addIterator(iter1).store(job);
 
     List<IteratorSetting> list = InputConfigurator.getIterators(AccumuloInputFormat.class,
         job.getConfiguration());
@@ -147,7 +145,7 @@ public void testIteratorOptionEncoding() throws Throwable {
     IteratorSetting iter2 = new IteratorSetting(1, "iter2", WholeRowIterator.class);
     iter2.addOption(key, value);
     iter2.addOption(key + "2", value);
-    AccumuloInputFormat.setInfo(job, opts.addIterator(iter1).addIterator(iter2).build());
+    opts.addIterator(iter1).addIterator(iter2).store(job);
     list = InputConfigurator.getIterators(AccumuloInputFormat.class, job.getConfiguration());
     assertEquals(2, list.size());
     assertEquals(1, list.get(0).getOptions().size());
@@ -167,9 +165,8 @@ public void testGetIteratorSettings() throws IOException {
     IteratorSetting iter1 = new IteratorSetting(1, "WholeRow", WholeRowIterator.class.getName());
     IteratorSetting iter2 = new IteratorSetting(2, "Versions", VersioningIterator.class.getName());
     IteratorSetting iter3 = new IteratorSetting(3, "Count", CountingIterator.class.getName());
-    AccumuloInputFormat.setInfo(job,
-        InputInfo.builder().clientInfo(clientInfo).table("test").scanAuths(Authorizations.EMPTY)
-            .addIterator(iter1).addIterator(iter2).addIterator(iter3).build());
+    AccumuloInputFormat.configure().clientInfo(clientInfo).table("test").auths(Authorizations.EMPTY)
+        .addIterator(iter1).addIterator(iter2).addIterator(iter3).store(job);
 
     List<IteratorSetting> list = InputConfigurator.getIterators(AccumuloInputFormat.class,
         job.getConfiguration());
@@ -203,8 +200,8 @@ public void testSetRegex() throws IOException {
 
     IteratorSetting is = new IteratorSetting(50, regex, RegExFilter.class);
     RegExFilter.setRegexs(is, regex, null, null, null, false);
-    AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(clientInfo).table("test")
-        .scanAuths(Authorizations.EMPTY).addIterator(is).build());
+    AccumuloInputFormat.configure().clientInfo(clientInfo).table("test").auths(Authorizations.EMPTY)
+        .addIterator(is).store(job);
 
     assertEquals(regex, InputConfigurator
         .getIterators(AccumuloInputFormat.class, job.getConfiguration()).get(0).getName());
@@ -219,8 +216,8 @@ public void testEmptyColumnFamily() throws IOException {
     cols.add(new IteratorSetting.Column(new Text(""), new Text("bar")));
     cols.add(new IteratorSetting.Column(new Text(""), new Text("")));
     cols.add(new IteratorSetting.Column(new Text("foo"), new Text("")));
-    AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(clientInfo).table("test")
-        .scanAuths(Authorizations.EMPTY).fetchColumns(cols).build());
+    AccumuloInputFormat.configure().clientInfo(clientInfo).table("test").auths(Authorizations.EMPTY)
+        .fetchColumns(cols).store(job);
 
     assertEquals(cols,
         InputConfigurator.getFetchedColumns(AccumuloInputFormat.class, job.getConfiguration()));
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormatTest.java
index e698b3a0cc..52aace9a18 100644
--- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormatTest.java
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormatTest.java
@@ -16,20 +16,16 @@
  */
 package org.apache.accumulo.hadoop.mapreduce;
 
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.replay;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 
 import java.io.IOException;
-import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.ClientInfo;
-import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
-import org.apache.accumulo.hadoopImpl.mapreduce.AccumuloOutputFormatImpl;
+import org.apache.accumulo.hadoopImpl.mapreduce.lib.OutputConfigurator;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.junit.Test;
@@ -38,14 +34,11 @@
 
   @Test
   public void testBWSettings() throws IOException {
-    ClientInfo clientInfo = createMock(ClientInfo.class);
-    AuthenticationToken token = createMock(AuthenticationToken.class);
-    Properties props = createMock(Properties.class);
-    expect(clientInfo.getAuthenticationToken()).andReturn(token).anyTimes();
-    expect(clientInfo.getProperties()).andReturn(props).anyTimes();
-    replay(clientInfo);
     Job job = Job.getInstance();
 
+    AccumuloClient.ConnectionOptions opts = Accumulo.newClient().to("test", "zk").as("blah",
+        "blah");
+
     // make sure we aren't testing defaults
     final BatchWriterConfig bwDefaults = new BatchWriterConfig();
     assertNotEquals(7654321L, bwDefaults.getMaxLatency(TimeUnit.MILLISECONDS));
@@ -58,13 +51,14 @@ public void testBWSettings() throws IOException {
     bwConfig.setTimeout(9898989L, TimeUnit.MILLISECONDS);
     bwConfig.setMaxWriteThreads(42);
     bwConfig.setMaxMemory(1123581321L);
-    AccumuloOutputFormat.setInfo(job,
-        OutputInfo.builder().clientInfo(clientInfo).batchWriterOptions(bwConfig).build());
+    opts.batchWriterConfig(bwConfig);
+    AccumuloOutputFormat.configure().clientInfo(opts.info()).store(job);
 
     AccumuloOutputFormat myAOF = new AccumuloOutputFormat() {
       @Override
       public void checkOutputSpecs(JobContext job) throws IOException {
-        BatchWriterConfig bwOpts = AccumuloOutputFormatImpl.getBatchWriterOptions(job);
+        BatchWriterConfig bwOpts = OutputConfigurator
+            .getBatchWriterOptions(AccumuloOutputFormat.class, job.getConfiguration());
 
         // passive check
         assertEquals(bwConfig.getMaxLatency(TimeUnit.MILLISECONDS),
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ConfigurableMacBase.java b/test/src/main/java/org/apache/accumulo/test/functional/ConfigurableMacBase.java
index de722ef467..ba0fb85aab 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ConfigurableMacBase.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ConfigurableMacBase.java
@@ -29,6 +29,7 @@
 import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.ClientInfo;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.clientImpl.ClientContext;
@@ -205,6 +206,11 @@ protected ClientInfo getClientInfo() {
         .as("root", ROOT_PASSWORD).info();
   }
 
+  protected ClientInfo getClientInfo(BatchWriterConfig bwConfig) {
+    return Accumulo.newClient().to(getCluster().getInstanceName(), getCluster().getZooKeepers())
+        .as("root", ROOT_PASSWORD).batchWriterConfig(bwConfig).info();
+  }
+
   protected ServerContext getServerContext() {
     return getCluster().getServerContext();
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services