You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2018/11/26 18:51:51 UTC
[accumulo] branch master updated: MR Improvements Closes #753 #751
(#765)
This is an automated email from the ASF dual-hosted git repository.
mmiller pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push:
new c8f5bf1 MR Improvements Closes #753 #751 (#765)
c8f5bf1 is described below
commit c8f5bf1670bb053b4111926179b458b4a58f884b
Author: Mike Miller <mm...@apache.org>
AuthorDate: Mon Nov 26 13:51:46 2018 -0500
MR Improvements Closes #753 #751 (#765)
* Combine MR static method and builder. Closes #753
* Remove BWConfig from map reduce API. Closes #751
* Replace setInfo methods with configure which begins fluent API
* Replace *Info objects with *FormatBuilders
* Migrate tests from core to use new API
Co-authored-by: Keith Turner <kt...@apache.org>
* Limit MR config to Job XOR JobConf (#13)
---
.../hadoop/mapred/AccumuloFileOutputFormat.java | 43 +--
.../hadoop/mapred/AccumuloInputFormat.java | 57 +---
.../hadoop/mapred/AccumuloOutputFormat.java | 29 +-
.../hadoop/mapred/AccumuloRowInputFormat.java | 58 +---
.../hadoop/mapreduce/AccumuloFileOutputFormat.java | 51 +--
.../hadoop/mapreduce/AccumuloInputFormat.java | 59 +---
.../hadoop/mapreduce/AccumuloOutputFormat.java | 33 +-
.../hadoop/mapreduce/AccumuloRowInputFormat.java | 59 +---
.../hadoop/mapreduce/FileOutputFormatBuilder.java | 124 +++++++
.../accumulo/hadoop/mapreduce/FileOutputInfo.java | 192 -----------
.../hadoop/mapreduce/InputFormatBuilder.java | 265 +++++++++++++++
.../accumulo/hadoop/mapreduce/InputInfo.java | 364 ---------------------
.../hadoop/mapreduce/OutputFormatBuilder.java | 84 +++++
.../accumulo/hadoop/mapreduce/OutputInfo.java | 143 --------
.../mapred/AccumuloOutputFormatImpl.java | 31 +-
.../mapreduce/AccumuloOutputFormatImpl.java | 31 +-
.../mapreduce/FileOutputFormatBuilderImpl.java | 155 +++++++++
.../hadoopImpl/mapreduce/FileOutputInfoImpl.java | 159 ---------
.../mapreduce/InputFormatBuilderImpl.java | 247 ++++++++++++++
.../hadoopImpl/mapreduce/InputInfoImpl.java | 267 ---------------
.../mapreduce/OutputFormatBuilderImpl.java | 97 ++++++
.../hadoopImpl/mapreduce/OutputInfoImpl.java | 115 -------
.../lib/MapReduceClientOnDefaultTable.java | 9 +-
.../lib/MapReduceClientOnRequiredTable.java | 10 +-
.../mapreduce/lib/OutputConfigurator.java | 86 ++---
.../its/mapred/AccumuloFileOutputFormatIT.java | 227 +++++++++++++
.../hadoop/its/mapred/AccumuloInputFormatIT.java | 244 ++++++++++++++
.../hadoop/its/mapred/AccumuloOutputFormatIT.java | 229 +++++++++++++
.../its/mapred/AccumuloRowInputFormatIT.java | 5 +-
.../accumulo/hadoop/its/mapred/TokenFileIT.java | 181 ++++++++++
.../its/mapreduce/AccumuloFileOutputFormatIT.java | 236 +++++++++++++
...putFormatIT.java => AccumuloInputFormatIT.java} | 59 ++--
.../its/mapreduce/AccumuloOutputFormatIT.java | 152 +++++++++
.../its/mapreduce/AccumuloRowInputFormatIT.java | 5 +-
.../accumulo/hadoop/its/mapreduce/RowHashIT.java | 174 ++++++++++
.../accumulo/hadoop/its/mapreduce/TokenFileIT.java | 174 ++++++++++
.../mapred/AccumuloFileOutputFormatTest.java | 14 +-
.../hadoop/mapred/AccumuloInputFormatTest.java | 36 +-
.../hadoop/mapred/AccumuloOutputFormatTest.java | 27 +-
.../mapreduce/AccumuloFileOutputFormatTest.java | 13 +-
.../hadoop/mapreduce/AccumuloInputFormatTest.java | 33 +-
.../hadoop/mapreduce/AccumuloOutputFormatTest.java | 26 +-
.../test/functional/ConfigurableMacBase.java | 6 +
43 files changed, 2815 insertions(+), 1794 deletions(-)
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloFileOutputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloFileOutputFormat.java
index d44219d..ff531b9 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloFileOutputFormat.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloFileOutputFormat.java
@@ -16,24 +16,16 @@
*/
package org.apache.accumulo.hadoop.mapred;
-import static org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setCompressionType;
-import static org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setDataBlockSize;
-import static org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setFileBlockSize;
-import static org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setIndexBlockSize;
-import static org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setReplication;
-import static org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setSampler;
-import static org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setSummarizers;
-
import java.io.IOException;
import org.apache.accumulo.core.client.rfile.RFile;
import org.apache.accumulo.core.client.rfile.RFileWriter;
-import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.hadoop.mapreduce.FileOutputInfo;
+import org.apache.accumulo.hadoop.mapreduce.FileOutputFormatBuilder;
+import org.apache.accumulo.hadoopImpl.mapreduce.FileOutputFormatBuilderImpl;
import org.apache.accumulo.hadoopImpl.mapreduce.lib.ConfiguratorBase;
import org.apache.accumulo.hadoopImpl.mapreduce.lib.FileOutputConfigurator;
import org.apache.hadoop.conf.Configuration;
@@ -46,18 +38,7 @@ import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
/**
- * This class allows MapReduce jobs to write output in the Accumulo data file format.<br>
- * Care should be taken to write only sorted data (sorted by {@link Key}), as this is an important
- * requirement of Accumulo data files.
- *
- * <p>
- * The output path to be created must be specified via {@link #setInfo(JobConf, FileOutputInfo)}
- * using {@link FileOutputInfo#builder()}.outputPath(path). For all available options see
- * {@link FileOutputInfo#builder()}
- * <p>
- * Methods inherited from {@link FileOutputFormat} are not supported and may be ignored or cause
- * failures. Using other Hadoop configuration options that affect the behavior of the underlying
- * files directly in the Job's configuration may work, but are not directly supported at this time.
+ * @see org.apache.accumulo.hadoop.mapreduce.AccumuloFileOutputFormat
*
* @since 2.0
*/
@@ -100,22 +81,8 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
/**
* Sets all the information required for this map reduce job.
*/
- public static void setInfo(JobConf job, FileOutputInfo info) {
- setOutputPath(job, info.getOutputPath());
- if (info.getCompressionType().isPresent())
- setCompressionType(job, info.getCompressionType().get());
- if (info.getDataBlockSize().isPresent())
- setDataBlockSize(job, info.getDataBlockSize().get());
- if (info.getFileBlockSize().isPresent())
- setFileBlockSize(job, info.getFileBlockSize().get());
- if (info.getIndexBlockSize().isPresent())
- setIndexBlockSize(job, info.getIndexBlockSize().get());
- if (info.getReplication().isPresent())
- setReplication(job, info.getReplication().get());
- if (info.getSampler().isPresent())
- setSampler(job, info.getSampler().get());
- if (info.getSummarizers().size() > 0)
- setSummarizers(job, info.getSummarizers().toArray(new SummarizerConfiguration[0]));
+ public static FileOutputFormatBuilder.PathParams<JobConf> configure() {
+ return new FileOutputFormatBuilderImpl<JobConf>();
}
}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormat.java
index a344a4b..92df89d 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormat.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormat.java
@@ -16,29 +16,16 @@
*/
package org.apache.accumulo.hadoop.mapred;
-import static org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setClassLoaderContext;
-import static org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setClientInfo;
-import static org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setScanAuthorizations;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setAutoAdjustRanges;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setBatchScan;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setExecutionHints;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setInputTableName;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setLocalIterators;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setOfflineTableScan;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setRanges;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setSamplerConfiguration;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setScanIsolation;
-
import java.io.IOException;
import java.util.Map.Entry;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.util.format.DefaultFormatter;
-import org.apache.accumulo.hadoop.mapreduce.InputInfo;
+import org.apache.accumulo.hadoop.mapreduce.InputFormatBuilder;
import org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat;
import org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.RecordReaderBase;
-import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator;
+import org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBuilderImpl;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
@@ -48,17 +35,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * This class allows MapReduce jobs to use Accumulo as the source of data. This
- * {@link org.apache.hadoop.mapred.InputFormat} provides keys and values of type {@link Key} and
- * {@link Value} to the Map function.
- *
- * The user must specify the following via static configurator method:
- *
- * <ul>
- * <li>{@link AccumuloInputFormat#setInfo(JobConf, InputInfo)}
- * </ul>
- *
- * For required parameters and all available options use {@link InputInfo#builder()}
+ * @see org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat
*
* @since 2.0
*/
@@ -115,28 +92,10 @@ public class AccumuloInputFormat implements InputFormat<Key,Value> {
return recordReader;
}
- public static void setInfo(JobConf job, InputInfo info) {
- setClientInfo(job, info.getClientInfo());
- setScanAuthorizations(job, info.getScanAuths());
- setInputTableName(job, info.getTableName());
-
- // all optional values
- if (info.getContext().isPresent())
- setClassLoaderContext(job, info.getContext().get());
- if (info.getRanges().size() > 0)
- setRanges(job, info.getRanges());
- if (info.getIterators().size() > 0)
- InputConfigurator.writeIteratorsToConf(CLASS, job, info.getIterators());
- if (info.getFetchColumns().size() > 0)
- InputConfigurator.fetchColumns(CLASS, job, info.getFetchColumns());
- if (info.getSamplerConfig().isPresent())
- setSamplerConfiguration(job, info.getSamplerConfig().get());
- if (info.getExecutionHints().size() > 0)
- setExecutionHints(job, info.getExecutionHints());
- setAutoAdjustRanges(job, info.isAutoAdjustRanges());
- setScanIsolation(job, info.isScanIsolation());
- setLocalIterators(job, info.isLocalIterators());
- setOfflineTableScan(job, info.isOfflineScan());
- setBatchScan(job, info.isBatchScan());
+ /**
+ * Sets all the information required for this map reduce job.
+ */
+ public static InputFormatBuilder.ClientParams<JobConf> configure() {
+ return new InputFormatBuilderImpl<JobConf>(CLASS);
}
}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormat.java
index dca05a4..47864fa 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormat.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormat.java
@@ -17,11 +17,6 @@
package org.apache.accumulo.hadoop.mapred;
import static org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl.getClientInfo;
-import static org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl.setBatchWriterOptions;
-import static org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl.setClientInfo;
-import static org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl.setCreateTables;
-import static org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl.setDefaultTableName;
-import static org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl.setSimulationMode;
import java.io.IOException;
@@ -32,8 +27,9 @@ import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.ClientInfo;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.hadoop.mapreduce.OutputInfo;
+import org.apache.accumulo.hadoop.mapreduce.OutputFormatBuilder;
import org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl;
+import org.apache.accumulo.hadoopImpl.mapreduce.OutputFormatBuilderImpl;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
@@ -42,15 +38,7 @@ import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.util.Progressable;
/**
- * This class allows MapReduce jobs to use Accumulo as the sink for data. This {@link OutputFormat}
- * accepts keys and values of type {@link Text} (for a table name) and {@link Mutation} from the Map
- * and Reduce functions.
- *
- * The user must specify the following via static configurator method:
- *
- * <ul>
- * <li>{@link AccumuloOutputFormat#setInfo(JobConf, OutputInfo)}
- * </ul>
+ * @see org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat
*
* @since 2.0
*/
@@ -81,14 +69,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
}
}
- public static void setInfo(JobConf job, OutputInfo info) {
- setClientInfo(job, info.getClientInfo());
- if (info.getBatchWriterOptions().isPresent())
- setBatchWriterOptions(job, info.getBatchWriterOptions().get());
- if (info.getDefaultTableName().isPresent())
- setDefaultTableName(job, info.getDefaultTableName().get());
- setCreateTables(job, info.isCreateTables());
- setSimulationMode(job, info.isSimulationMode());
+ public static OutputFormatBuilder.ClientParams<JobConf> configure() {
+ return new OutputFormatBuilderImpl<JobConf>();
}
-
}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloRowInputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloRowInputFormat.java
index e823d8a..c1c6cbe 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloRowInputFormat.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloRowInputFormat.java
@@ -16,20 +16,6 @@
*/
package org.apache.accumulo.hadoop.mapred;
-import static org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setClassLoaderContext;
-import static org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setClientInfo;
-import static org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setScanAuthorizations;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.fetchColumns;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setAutoAdjustRanges;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setBatchScan;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setExecutionHints;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setInputTableName;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setLocalIterators;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setOfflineTableScan;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setRanges;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setSamplerConfiguration;
-import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setScanIsolation;
-
import java.io.IOException;
import java.util.Map.Entry;
@@ -37,10 +23,10 @@ import org.apache.accumulo.core.client.RowIterator;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.util.PeekingIterator;
-import org.apache.accumulo.hadoop.mapreduce.InputInfo;
+import org.apache.accumulo.hadoop.mapreduce.InputFormatBuilder;
import org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat;
import org.apache.accumulo.hadoopImpl.mapred.InputFormatBase;
-import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator;
+import org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBuilderImpl;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
@@ -49,22 +35,12 @@ import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
/**
- * This class allows MapReduce jobs to use Accumulo as the source of data. This {@link InputFormat}
- * provides row names as {@link Text} as keys, and a corresponding {@link PeekingIterator} as a
- * value, which in turn makes the {@link Key}/{@link Value} pairs for that row available to the Map
- * function.
- *
- * The user must specify the following via static configurator method:
- *
- * <ul>
- * <li>{@link AccumuloRowInputFormat#setInfo(JobConf, InputInfo)}
- * </ul>
- *
- * For required parameters and all available options use {@link InputInfo#builder()}
+ * @see org.apache.accumulo.hadoop.mapreduce.AccumuloRowInputFormat
*
* @since 2.0
*/
public class AccumuloRowInputFormat implements InputFormat<Text,PeekingIterator<Entry<Key,Value>>> {
+ private static final Class<AccumuloRowInputFormat> CLASS = AccumuloRowInputFormat.class;
/**
* Gets the splits of the tables that have been set on the job by reading the metadata table for
@@ -123,29 +99,7 @@ public class AccumuloRowInputFormat implements InputFormat<Text,PeekingIterator<
/**
* Sets all the information required for this map reduce job.
*/
- public static void setInfo(JobConf job, InputInfo info) {
- setClientInfo(job, info.getClientInfo());
- setScanAuthorizations(job, info.getScanAuths());
- setInputTableName(job, info.getTableName());
-
- // all optional values
- if (info.getContext().isPresent())
- setClassLoaderContext(job, info.getContext().get());
- if (info.getRanges().size() > 0)
- setRanges(job, info.getRanges());
- if (info.getIterators().size() > 0)
- InputConfigurator.writeIteratorsToConf(AccumuloRowInputFormat.class, job,
- info.getIterators());
- if (info.getFetchColumns().size() > 0)
- fetchColumns(job, info.getFetchColumns());
- if (info.getSamplerConfig().isPresent())
- setSamplerConfiguration(job, info.getSamplerConfig().get());
- if (info.getExecutionHints().size() > 0)
- setExecutionHints(job, info.getExecutionHints());
- setAutoAdjustRanges(job, info.isAutoAdjustRanges());
- setScanIsolation(job, info.isScanIsolation());
- setLocalIterators(job, info.isLocalIterators());
- setOfflineTableScan(job, info.isOfflineScan());
- setBatchScan(job, info.isBatchScan());
+ public static InputFormatBuilder.ClientParams<JobConf> configure() {
+ return new InputFormatBuilderImpl<JobConf>(CLASS);
}
}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloFileOutputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloFileOutputFormat.java
index 26f559e..2f869e9 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloFileOutputFormat.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloFileOutputFormat.java
@@ -16,23 +16,15 @@
*/
package org.apache.accumulo.hadoop.mapreduce;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setCompressionType;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setDataBlockSize;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setFileBlockSize;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setIndexBlockSize;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setReplication;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setSampler;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setSummarizers;
-
import java.io.IOException;
import org.apache.accumulo.core.client.rfile.RFile;
import org.apache.accumulo.core.client.rfile.RFileWriter;
-import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.hadoopImpl.mapreduce.FileOutputFormatBuilderImpl;
import org.apache.accumulo.hadoopImpl.mapreduce.lib.ConfiguratorBase;
import org.apache.accumulo.hadoopImpl.mapreduce.lib.FileOutputConfigurator;
import org.apache.hadoop.conf.Configuration;
@@ -45,16 +37,21 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* This class allows MapReduce jobs to write output in the Accumulo data file format.<br>
* Care should be taken to write only sorted data (sorted by {@link Key}), as this is an important
- * requirement of Accumulo data files.
+ * requirement of Accumulo data files. The output path to be created must be specified via
+ * {@link #configure()}, which uses a fluent API. For Example:
+ *
+ * <pre>
+ * AccumuloFileOutputFormat.configure()
+ * .outputPath(path)
+ * .fileBlockSize(b)
+ * .compression(type)
+ * .summarizers(sc1, sc2).store(job));
+ * </pre>
*
- * <p>
- * The output path to be created must be specified via {@link #setInfo(Job, FileOutputInfo)} using
- * {@link FileOutputInfo#builder()}.outputPath(path). For all available options see
- * {@link FileOutputInfo#builder()}
- * <p>
- * Methods inherited from {@link FileOutputFormat} are not supported and may be ignored or cause
- * failures. Using other Hadoop configuration options that affect the behavior of the underlying
- * files directly in the Job's configuration may work, but are not directly supported at this time.
+ * For all available options see {@link FileOutputFormatBuilder}. Methods inherited from
+ * {@link FileOutputFormat} are not supported and may be ignored or cause failures. Using other
+ * Hadoop configuration options that affect the behavior of the underlying files directly in the
+ * Job's configuration may work, but are not directly supported at this time.
*
* @since 2.0
*/
@@ -95,22 +92,8 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
/**
* Sets all the information required for this map reduce job.
*/
- public static void setInfo(Job job, FileOutputInfo info) {
- setOutputPath(job, info.getOutputPath());
- if (info.getCompressionType().isPresent())
- setCompressionType(job, info.getCompressionType().get());
- if (info.getDataBlockSize().isPresent())
- setDataBlockSize(job, info.getDataBlockSize().get());
- if (info.getFileBlockSize().isPresent())
- setFileBlockSize(job, info.getFileBlockSize().get());
- if (info.getIndexBlockSize().isPresent())
- setIndexBlockSize(job, info.getIndexBlockSize().get());
- if (info.getReplication().isPresent())
- setReplication(job, info.getReplication().get());
- if (info.getSampler().isPresent())
- setSampler(job, info.getSampler().get());
- if (info.getSummarizers().size() > 0)
- setSummarizers(job, info.getSummarizers().toArray(new SummarizerConfiguration[0]));
+ public static FileOutputFormatBuilder.PathParams<Job> configure() {
+ return new FileOutputFormatBuilderImpl<Job>();
}
}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormat.java
index fa64a5a..3c391a8 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormat.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormat.java
@@ -16,19 +16,6 @@
*/
package org.apache.accumulo.hadoop.mapreduce;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AbstractInputFormat.setClassLoaderContext;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AbstractInputFormat.setClientInfo;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AbstractInputFormat.setScanAuthorizations;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setAutoAdjustRanges;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setBatchScan;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setExecutionHints;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setInputTableName;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setLocalIterators;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setOfflineTableScan;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setRanges;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setSamplerConfiguration;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setScanIsolation;
-
import java.io.IOException;
import java.util.List;
import java.util.Map.Entry;
@@ -38,7 +25,7 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.util.format.DefaultFormatter;
import org.apache.accumulo.hadoopImpl.mapreduce.AbstractInputFormat;
import org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase;
-import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator;
+import org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBuilderImpl;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
@@ -50,15 +37,20 @@ import org.slf4j.LoggerFactory;
/**
* This class allows MapReduce jobs to use Accumulo as the source of data. This {@link InputFormat}
- * provides keys and values of type {@link Key} and {@link Value} to the Map function.
- *
- * The user must specify the following via static configurator method:
+ * provides keys and values of type {@link Key} and {@link Value} to the Map function. Configure the
+ * job using the {@link #configure()} method, which provides a fluent API. For Example:
*
- * <ul>
- * <li>{@link AccumuloInputFormat#setInfo(Job, InputInfo)}
- * </ul>
+ * <pre>
+ * AccumuloInputFormat.configure().clientInfo(info).table(name).auths(auths) // required
+ * .addIterator(iter1).ranges(ranges).fetchColumns(columns).executionHints(hints)
+ * .samplerConfiguration(sampleConf).disableAutoAdjustRanges() // enabled by default
+ * .scanIsolation() // not available with batchScan()
+ * .offlineScan() // not available with batchScan()
+ * .store(job);
+ * </pre>
*
- * For required parameters and all available options use {@link InputInfo#builder()}
+ * For descriptions of all options see
+ * {@link org.apache.accumulo.hadoop.mapreduce.InputFormatBuilder.InputFormatOptions}
*
* @since 2.0
*/
@@ -103,28 +95,7 @@ public class AccumuloInputFormat extends InputFormat<Key,Value> {
/**
* Sets all the information required for this map reduce job.
*/
- public static void setInfo(Job job, InputInfo info) {
- setClientInfo(job, info.getClientInfo());
- setScanAuthorizations(job, info.getScanAuths());
- setInputTableName(job, info.getTableName());
-
- // all optional values
- if (info.getContext().isPresent())
- setClassLoaderContext(job, info.getContext().get());
- if (info.getRanges().size() > 0)
- setRanges(job, info.getRanges());
- if (info.getIterators().size() > 0)
- InputConfigurator.writeIteratorsToConf(CLASS, job.getConfiguration(), info.getIterators());
- if (info.getFetchColumns().size() > 0)
- InputConfigurator.fetchColumns(CLASS, job.getConfiguration(), info.getFetchColumns());
- if (info.getSamplerConfig().isPresent())
- setSamplerConfiguration(job, info.getSamplerConfig().get());
- if (info.getExecutionHints().size() > 0)
- setExecutionHints(job, info.getExecutionHints());
- setAutoAdjustRanges(job, info.isAutoAdjustRanges());
- setScanIsolation(job, info.isScanIsolation());
- setLocalIterators(job, info.isLocalIterators());
- setOfflineTableScan(job, info.isOfflineScan());
- setBatchScan(job, info.isBatchScan());
+ public static InputFormatBuilder.ClientParams<Job> configure() {
+ return new InputFormatBuilderImpl<>(CLASS);
}
}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormat.java
index a285988..565de0e 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormat.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormat.java
@@ -17,11 +17,6 @@
package org.apache.accumulo.hadoop.mapreduce;
import static org.apache.accumulo.hadoopImpl.mapreduce.AbstractInputFormat.getClientInfo;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloOutputFormatImpl.setBatchWriterOptions;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloOutputFormatImpl.setClientInfo;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloOutputFormatImpl.setCreateTables;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloOutputFormatImpl.setDefaultTableName;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloOutputFormatImpl.setSimulationMode;
import java.io.IOException;
@@ -33,6 +28,7 @@ import org.apache.accumulo.core.client.ClientInfo;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.hadoopImpl.mapreduce.AccumuloOutputFormatImpl;
+import org.apache.accumulo.hadoopImpl.mapreduce.OutputFormatBuilderImpl;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
@@ -45,13 +41,15 @@ import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
/**
* This class allows MapReduce jobs to use Accumulo as the sink for data. This {@link OutputFormat}
* accepts keys and values of type {@link Text} (for a table name) and {@link Mutation} from the Map
- * and Reduce functions.
+ * and Reduce functions. Configured with fluent API using {@link AccumuloOutputFormat#configure()}.
+ * Here is an example with all possible options:
*
- * The user must specify the following via static configurator method:
- *
- * <ul>
- * <li>{@link AccumuloOutputFormat#setInfo(Job, OutputInfo)}
- * </ul>
+ * <pre>
+ * AccumuloOutputFormat.configure().clientInfo(clientInfo).batchWriterOptions(bwConfig)
+ * .defaultTable(name).createTables() // disabled by default
+ * .simulationMode() // disabled by default
+ * .store(job);
+ * </pre>
*
* @since 2.0
*/
@@ -88,14 +86,11 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
}
}
- public static void setInfo(Job job, OutputInfo info) {
- setClientInfo(job, info.getClientInfo());
- if (info.getBatchWriterOptions().isPresent())
- setBatchWriterOptions(job, info.getBatchWriterOptions().get());
- if (info.getDefaultTableName().isPresent())
- setDefaultTableName(job, info.getDefaultTableName().get());
- setCreateTables(job, info.isCreateTables());
- setSimulationMode(job, info.isSimulationMode());
+ /**
+ * Sets all the information required for this map reduce job.
+ */
+ public static OutputFormatBuilder.ClientParams<Job> configure() {
+ return new OutputFormatBuilderImpl<Job>();
}
}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloRowInputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloRowInputFormat.java
index d7bba69..899eb28 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloRowInputFormat.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloRowInputFormat.java
@@ -16,19 +16,6 @@
*/
package org.apache.accumulo.hadoop.mapreduce;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AbstractInputFormat.setClassLoaderContext;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AbstractInputFormat.setClientInfo;
-import static org.apache.accumulo.hadoopImpl.mapreduce.AbstractInputFormat.setScanAuthorizations;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setAutoAdjustRanges;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setBatchScan;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setExecutionHints;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setInputTableName;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setLocalIterators;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setOfflineTableScan;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setRanges;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setSamplerConfiguration;
-import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setScanIsolation;
-
import java.io.IOException;
import java.util.List;
import java.util.Map.Entry;
@@ -39,7 +26,7 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.util.PeekingIterator;
import org.apache.accumulo.hadoopImpl.mapreduce.AbstractInputFormat;
import org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase;
-import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator;
+import org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBuilderImpl;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -52,15 +39,20 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
* This class allows MapReduce jobs to use Accumulo as the source of data. This {@link InputFormat}
* provides row names as {@link Text} as keys, and a corresponding {@link PeekingIterator} as a
* value, which in turn makes the {@link Key}/{@link Value} pairs for that row available to the Map
- * function.
- *
- * The user must specify the following via static configurator method:
+ * function. Configure the job using the {@link #configure()} method, which provides a fluent API.
+ * For Example:
*
- * <ul>
- * <li>{@link AccumuloRowInputFormat#setInfo(Job, InputInfo)}
- * </ul>
+ * <pre>
+ * AccumuloRowInputFormat.configure().clientInfo(info).table(name).auths(auths) // required
+ * .addIterator(iter1).ranges(ranges).fetchColumns(columns).executionHints(hints)
+ * .samplerConfiguration(sampleConf).disableAutoAdjustRanges() // enabled by default
+ * .scanIsolation() // not available with batchScan()
+ * .offlineScan() // not available with batchScan()
+ * .store(job);
+ * </pre>
*
- * For required parameters and all available options use {@link InputInfo#builder()}
+ * For descriptions of all options see
+ * {@link org.apache.accumulo.hadoop.mapreduce.InputFormatBuilder.InputFormatOptions}
*
* @since 2.0
*/
@@ -111,28 +103,7 @@ public class AccumuloRowInputFormat extends InputFormat<Text,PeekingIterator<Ent
/**
* Sets all the information required for this map reduce job.
*/
- public static void setInfo(Job job, InputInfo info) {
- setClientInfo(job, info.getClientInfo());
- setScanAuthorizations(job, info.getScanAuths());
- setInputTableName(job, info.getTableName());
-
- // all optional values
- if (info.getContext().isPresent())
- setClassLoaderContext(job, info.getContext().get());
- if (info.getRanges().size() > 0)
- setRanges(job, info.getRanges());
- if (info.getIterators().size() > 0)
- InputConfigurator.writeIteratorsToConf(CLASS, job.getConfiguration(), info.getIterators());
- if (info.getFetchColumns().size() > 0)
- InputConfigurator.fetchColumns(CLASS, job.getConfiguration(), info.getFetchColumns());
- if (info.getSamplerConfig().isPresent())
- setSamplerConfiguration(job, info.getSamplerConfig().get());
- if (info.getExecutionHints().size() > 0)
- setExecutionHints(job, info.getExecutionHints());
- setAutoAdjustRanges(job, info.isAutoAdjustRanges());
- setScanIsolation(job, info.isScanIsolation());
- setLocalIterators(job, info.isLocalIterators());
- setOfflineTableScan(job, info.isOfflineScan());
- setBatchScan(job, info.isBatchScan());
+ public static InputFormatBuilder.ClientParams<Job> configure() {
+ return new InputFormatBuilderImpl<Job>(CLASS);
}
}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/FileOutputFormatBuilder.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/FileOutputFormatBuilder.java
new file mode 100644
index 0000000..d4e4fc8
--- /dev/null
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/FileOutputFormatBuilder.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoop.mapreduce;
+
+import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Builder for all the information needed for the Map Reduce job. Fluent API used by
+ * {@link AccumuloFileOutputFormat#configure()}
+ *
+ * @since 2.0
+ */
+public interface FileOutputFormatBuilder {
+ /**
+ * Required params for builder
+ *
+ * @since 2.0
+ */
+ interface PathParams<T> {
+ /**
+ * Set the Path of the output directory for the map-reduce job.
+ */
+ OutputOptions<T> outputPath(Path path);
+ }
+
+ /**
+ * Options for builder
+ *
+ * @since 2.0
+ */
+ interface OutputOptions<T> {
+ /**
+ * Sets the compression type to use for data blocks, overriding the default. Specifying a
+ * compression may require additional libraries to be available to your Job.
+ *
+ * @param compressionType
+ * one of "none", "gz", "lzo", or "snappy"
+ */
+ OutputOptions<T> compression(String compressionType);
+
+ /**
+ * Sets the size for data blocks within each file.<br>
+ * Data blocks are a span of key/value pairs stored in the file that are compressed and indexed
+ * as a group.
+ *
+ * <p>
+ * Making this value smaller may increase seek performance, but at the cost of increasing the
+ * size of the indexes (which can also affect seek performance).
+ *
+ * @param dataBlockSize
+ * the block size, in bytes
+ */
+ OutputOptions<T> dataBlockSize(long dataBlockSize);
+
+ /**
+ * Sets the size for file blocks in the file system; file blocks are managed, and replicated, by
+ * the underlying file system.
+ *
+ * @param fileBlockSize
+ * the block size, in bytes
+ */
+ OutputOptions<T> fileBlockSize(long fileBlockSize);
+
+ /**
+ * Sets the size for index blocks within each file; smaller blocks means a deeper index
+ * hierarchy within the file, while larger blocks mean a more shallow index hierarchy within the
+ * file. This can affect the performance of queries.
+ *
+ * @param indexBlockSize
+ * the block size, in bytes
+ */
+ OutputOptions<T> indexBlockSize(long indexBlockSize);
+
+ /**
+ * Sets the file system replication factor for the resulting file, overriding the file system
+ * default.
+ *
+ * @param replication
+ * the number of replicas for produced files
+ */
+ OutputOptions<T> replication(int replication);
+
+ /**
+ * Specify a sampler to be used when writing out data. This will result in the output file
+ * having sample data.
+ *
+ * @param samplerConfig
+ * The configuration for creating sample data in the output file.
+ */
+ OutputOptions<T> sampler(SamplerConfiguration samplerConfig);
+
+ /**
+ * Specifies a list of summarizer configurations to create summary data in the output file. Each
+ * Key Value written will be passed to the configured
+ * {@link org.apache.accumulo.core.client.summary.Summarizer}'s.
+ *
+ * @param summarizerConfigs
+ * summarizer configurations
+ */
+ OutputOptions<T> summarizers(SummarizerConfiguration... summarizerConfigs);
+
+ /**
+ * Finish configuring, verify and serialize options into the Job or JobConf
+ */
+ void store(T job);
+ }
+
+}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/FileOutputInfo.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/FileOutputInfo.java
deleted file mode 100644
index 70b6043..0000000
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/FileOutputInfo.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.hadoop.mapreduce;
-
-import java.util.Collection;
-import java.util.Optional;
-
-import org.apache.accumulo.core.client.sample.SamplerConfiguration;
-import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
-import org.apache.accumulo.hadoopImpl.mapreduce.FileOutputInfoImpl;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-
-/**
- * Object containing all the information needed for the Map Reduce job. This object is passed to
- * {@link AccumuloFileOutputFormat#setInfo(Job, FileOutputInfo)}. It uses a fluent API like so:
- *
- * <pre>
- * FileOutputInfo.builder()
- * .outputPath(path)
- * .fileBlockSize(b)
- * .compressionType(type)
- * .summarizers(sc1, sc2).build());
- * </pre>
- *
- * @since 2.0
- */
-public interface FileOutputInfo {
-
- /**
- * @return the output path set using FileOutputInfo.builder()...outputPath(path)
- */
- public Path getOutputPath();
-
- /**
- * @return the compression if set using FileOutputInfo.builder()...compressionType(type)
- */
- public Optional<String> getCompressionType();
-
- /**
- * @return the data block size if set using FileOutputInfo.builder()...dataBlockSize(size)
- */
- public Optional<Long> getDataBlockSize();
-
- /**
- * @return the file block size if set using FileOutputInfo.builder()...fileBlockSize(size)
- */
- public Optional<Long> getFileBlockSize();
-
- /**
- * @return the index block size if set using FileOutputInfo.builder()...indexBlockSize(size)
- */
- public Optional<Long> getIndexBlockSize();
-
- /**
- * @return the replication if set using FileOutputInfo.builder()...replication(num)
- */
- public Optional<Integer> getReplication();
-
- /**
- * @return the SamplerConfiguration if set using FileOutputInfo.builder()...sampler(conf)
- */
- public Optional<SamplerConfiguration> getSampler();
-
- /**
- * @return the summarizers set using FileOutputInfo.builder()...summarizers(conf1, conf2...)
- */
- public Collection<SummarizerConfiguration> getSummarizers();
-
- /**
- * @return builder for creating a {@link FileOutputInfo}
- */
- public static FileOutputInfoBuilder.PathParams builder() {
- return new FileOutputInfoImpl.FileOutputInfoBuilderImpl();
- }
-
- /**
- * Fluent API builder for FileOutputInfo
- *
- * @since 2.0
- */
- interface FileOutputInfoBuilder {
-
- /**
- * Required params for builder
- *
- * @since 2.0
- */
- interface PathParams {
- /**
- * Set the Path of the output directory for the map-reduce job.
- */
- OutputOptions outputPath(Path path);
- }
-
- /**
- * Options for builder
- *
- * @since 2.0
- */
- interface OutputOptions {
- /**
- * Sets the compression type to use for data blocks, overriding the default. Specifying a
- * compression may require additional libraries to be available to your Job.
- *
- * @param compressionType
- * one of "none", "gz", "lzo", or "snappy"
- */
- OutputOptions compressionType(String compressionType);
-
- /**
- * Sets the size for data blocks within each file.<br>
- * Data blocks are a span of key/value pairs stored in the file that are compressed and
- * indexed as a group.
- *
- * <p>
- * Making this value smaller may increase seek performance, but at the cost of increasing the
- * size of the indexes (which can also affect seek performance).
- *
- * @param dataBlockSize
- * the block size, in bytes
- */
- OutputOptions dataBlockSize(long dataBlockSize);
-
- /**
- * Sets the size for file blocks in the file system; file blocks are managed, and replicated,
- * by the underlying file system.
- *
- * @param fileBlockSize
- * the block size, in bytes
- */
- OutputOptions fileBlockSize(long fileBlockSize);
-
- /**
- * Sets the size for index blocks within each file; smaller blocks means a deeper index
- * hierarchy within the file, while larger blocks mean a more shallow index hierarchy within
- * the file. This can affect the performance of queries.
- *
- * @param indexBlockSize
- * the block size, in bytes
- */
- OutputOptions indexBlockSize(long indexBlockSize);
-
- /**
- * Sets the file system replication factor for the resulting file, overriding the file system
- * default.
- *
- * @param replication
- * the number of replicas for produced files
- */
- OutputOptions replication(int replication);
-
- /**
- * Specify a sampler to be used when writing out data. This will result in the output file
- * having sample data.
- *
- * @param samplerConfig
- * The configuration for creating sample data in the output file.
- */
- OutputOptions sampler(SamplerConfiguration samplerConfig);
-
- /**
- * Specifies a list of summarizer configurations to create summary data in the output file.
- * Each Key Value written will be passed to the configured
- * {@link org.apache.accumulo.core.client.summary.Summarizer}'s.
- *
- * @param summarizerConfigs
- * summarizer configurations
- */
- OutputOptions summarizers(SummarizerConfiguration... summarizerConfigs);
-
- /**
- * @return newly created {@link FileOutputInfo}
- */
- FileOutputInfo build();
- }
- }
-}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/InputFormatBuilder.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/InputFormatBuilder.java
new file mode 100644
index 0000000..1751d9f
--- /dev/null
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/InputFormatBuilder.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoop.mapreduce;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.accumulo.core.client.ClientInfo;
+import org.apache.accumulo.core.client.ClientSideIteratorScanner;
+import org.apache.accumulo.core.client.IsolatedScanner;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.security.Authorizations;
+
+/**
+ * Builder for all the information needed for the Map Reduce job. Fluent API used by
+ * {@link AccumuloInputFormat#configure()}
+ *
+ * @since 2.0
+ */
+public interface InputFormatBuilder {
+ /**
+ * Required params for builder
+ *
+ * @since 2.0
+ */
+ interface ClientParams<T> {
+ /**
+ * Set the connection information needed to communicate with Accumulo in this job. ClientInfo
+ * param can be created using {@link ClientInfo#from(Properties)}
+ *
+ * @param clientInfo
+ * Accumulo connection information
+ */
+ TableParams<T> clientInfo(ClientInfo clientInfo);
+ }
+
+ /**
+ * Required params for builder
+ *
+ * @since 2.0
+ */
+ interface TableParams<T> {
+ /**
+ * Sets the name of the input table, over which this job will scan.
+ *
+ * @param tableName
+ * the table to use when the tablename is null in the write call
+ */
+ AuthsParams<T> table(String tableName);
+ }
+
+ /**
+ * Required params for builder
+ *
+ * @since 2.0
+ */
+ interface AuthsParams<T> {
+ /**
+ * Sets the {@link Authorizations} used to scan. Must be a subset of the user's authorizations.
+ * If none present use {@link Authorizations#EMPTY}
+ *
+ * @param auths
+ * the user's authorizations
+ */
+ InputFormatOptions<T> auths(Authorizations auths);
+ }
+
+ /**
+ * Options for batch scan
+ *
+ * @since 2.0
+ */
+ interface BatchScanOptions<T> {
+ /**
+ * Finish configuring, verify and store options into the JobConf or Job
+ */
+ void store(T t);
+ }
+
+ /**
+ * Options for scan
+ *
+ * @since 2.0
+ */
+ interface ScanOptions<T> extends BatchScanOptions<T> {
+ /**
+ * @see InputFormatOptions#scanIsolation()
+ */
+ ScanOptions<T> scanIsolation();
+
+ /**
+ * @see InputFormatOptions#localIterators()
+ */
+ ScanOptions<T> localIterators();
+
+ /**
+ * @see InputFormatOptions#offlineScan()
+ */
+ ScanOptions<T> offlineScan();
+ }
+
+ /**
+ * Optional values to set using fluent API
+ *
+ * @since 2.0
+ */
+ interface InputFormatOptions<T> {
+ /**
+ * Sets the name of the classloader context on this scanner
+ *
+ * @param context
+ * name of the classloader context
+ */
+ InputFormatOptions<T> classLoaderContext(String context);
+
+ /**
+ * Sets the input ranges to scan for the single input table associated with this job.
+ *
+ * @param ranges
+ * the ranges that will be mapped over
+ * @see TableOperations#splitRangeByTablets(String, Range, int)
+ */
+ InputFormatOptions<T> ranges(Collection<Range> ranges);
+
+ /**
+ * Restricts the columns that will be mapped over for this job for the default input table.
+ *
+ * @param fetchColumns
+ * a collection of IteratorSetting.Column objects corresponding to column family and
+ * column qualifier. If the column qualifier is null, the entire column family is
+ * selected. An empty set is the default and is equivalent to scanning all columns.
+ */
+ InputFormatOptions<T> fetchColumns(Collection<IteratorSetting.Column> fetchColumns);
+
+ /**
+ * Encode an iterator on the single input table for this job. It is safe to call this method
+ * multiple times. If an iterator is added with the same name, it will be overridden.
+ *
+ * @param cfg
+ * the configuration of the iterator
+ */
+ InputFormatOptions<T> addIterator(IteratorSetting cfg);
+
+ /**
+ * Set these execution hints on scanners created for input splits. See
+ * {@link ScannerBase#setExecutionHints(java.util.Map)}
+ */
+ InputFormatOptions<T> executionHints(Map<String,String> hints);
+
+ /**
+ * Causes input format to read sample data. If sample data was created using a different
+ * configuration or a tables sampler configuration changes while reading data, then the input
+ * format will throw an error.
+ *
+ * @param samplerConfig
+ * The sampler configuration that sample must have been created with inorder for
+ * reading sample data to succeed.
+ *
+ * @see ScannerBase#setSamplerConfiguration(SamplerConfiguration)
+ */
+ InputFormatOptions<T> samplerConfiguration(SamplerConfiguration samplerConfig);
+
+ /**
+ * Disables the automatic adjustment of ranges for this job. This feature merges overlapping
+ * ranges, then splits them to align with tablet boundaries. Disabling this feature will cause
+ * exactly one Map task to be created for each specified range. Disabling has no effect for
+ * batch scans at it will always automatically adjust ranges.
+ * <p>
+ * By default, this feature is <b>enabled</b>.
+ *
+ * @see #ranges(Collection)
+ */
+ InputFormatOptions<T> disableAutoAdjustRanges();
+
+ /**
+ * Enables the use of the {@link IsolatedScanner} in this job.
+ * <p>
+ * By default, this feature is <b>disabled</b>.
+ */
+ ScanOptions<T> scanIsolation();
+
+ /**
+ * Enables the use of the {@link ClientSideIteratorScanner} in this job. This feature will cause
+ * the iterator stack to be constructed within the Map task, rather than within the Accumulo
+ * TServer. To use this feature, all classes needed for those iterators must be available on the
+ * classpath for the task.
+ * <p>
+ * By default, this feature is <b>disabled</b>.
+ */
+ ScanOptions<T> localIterators();
+
+ /**
+ * Enable reading offline tables. By default, this feature is disabled and only online tables
+ * are scanned. This will make the map reduce job directly read the table's files. If the table
+ * is not offline, then the job will fail. If the table comes online during the map reduce job,
+ * it is likely that the job will fail.
+ * <p>
+ * To use this option, the map reduce user will need access to read the Accumulo directory in
+ * HDFS.
+ * <p>
+ * Reading the offline table will create the scan time iterator stack in the map process. So any
+ * iterators that are configured for the table will need to be on the mapper's classpath.
+ * <p>
+ * One way to use this feature is to clone a table, take the clone offline, and use the clone as
+ * the input table for a map reduce job. If you plan to map reduce over the data many times, it
+ * may be better to the compact the table, clone it, take it offline, and use the clone for all
+ * map reduce jobs. The reason to do this is that compaction will reduce each tablet in the
+ * table to one file, and it is faster to read from one file.
+ * <p>
+ * There are two possible advantages to reading a tables file directly out of HDFS. First, you
+ * may see better read performance. Second, it will support speculative execution better. When
+ * reading an online table speculative execution can put more load on an already slow tablet
+ * server.
+ * <p>
+ * By default, this feature is <b>disabled</b>.
+ */
+ ScanOptions<T> offlineScan();
+
+ /**
+ * Enables the use of the {@link org.apache.accumulo.core.client.BatchScanner} in this job.
+ * Using this feature will group Ranges by their source tablet, producing an InputSplit per
+ * tablet rather than per Range. This batching helps to reduce overhead when querying a large
+ * number of small ranges. (ex: when doing quad-tree decomposition for spatial queries)
+ * <p>
+ * In order to achieve good locality of InputSplits this option always clips the input Ranges to
+ * tablet boundaries. This may result in one input Range contributing to several InputSplits.
+ * <p>
+ * Note: calls to {@link #disableAutoAdjustRanges()} is ignored when BatchScan is enabled.
+ * <p>
+ * This configuration is incompatible with:
+ * <ul>
+ * <li>{@link #offlineScan()}</li>
+ * <li>{@link #localIterators()}</li>
+ * <li>{@link #scanIsolation()}</li>
+ * </ul>
+ * <p>
+ * By default, this feature is <b>disabled</b>.
+ */
+ BatchScanOptions<T> batchScan();
+
+ /**
+ * Finish configuring, verify and serialize options into the JobConf or Job
+ */
+ void store(T j);
+ }
+}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/InputInfo.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/InputInfo.java
deleted file mode 100644
index d443ce4..0000000
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/InputInfo.java
+++ /dev/null
@@ -1,364 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.hadoop.mapreduce;
-
-import java.nio.file.Path;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-import org.apache.accumulo.core.client.ClientInfo;
-import org.apache.accumulo.core.client.ClientSideIteratorScanner;
-import org.apache.accumulo.core.client.IsolatedScanner;
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.ScannerBase;
-import org.apache.accumulo.core.client.admin.TableOperations;
-import org.apache.accumulo.core.client.sample.SamplerConfiguration;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.hadoopImpl.mapreduce.InputInfoImpl;
-import org.apache.hadoop.mapreduce.Job;
-
-/**
- * Object containing all the information needed for the Map Reduce job. This object is passed to the
- * different Input Format types see {@link AccumuloInputFormat#setInfo(Job, InputInfo)}. It uses a
- * fluent API:
- *
- * <pre>
- * InputInfo.builder().clientInfo(info).table(name).scanAuths(auths).build();
- *
- * InputInfo.builder().clientProperties(props).table(name).scanAuths(auths).addIterator(cfg)
- * .executionHints(hints).build();
- * </pre>
- *
- * @since 2.0
- */
-public interface InputInfo {
- /**
- * @return the table name set using InputInfo.builder()...table(name)
- */
- String getTableName();
-
- /**
- * @return the client info set using InputInfo.builder().clientInfo(info)
- */
- ClientInfo getClientInfo();
-
- /**
- * @return the scan authorizations set using InputInfo.builder()...scanAuths(auths)
- */
- Authorizations getScanAuths();
-
- /**
- * @return the context set using InputInfo.builder()...classLoaderContext(context)
- */
- Optional<String> getContext();
-
- /**
- * @return the Ranges set using InputInfo.builder()...ranges(ranges)
- */
- Collection<Range> getRanges();
-
- /**
- * @return the ColumnFamily,ColumnQualifier Pairs set using
- * InputInfo.builder()...fetchColumns(cfcqPairs)
- */
- Collection<IteratorSetting.Column> getFetchColumns();
-
- /**
- * @return the collection of IteratorSettings set using InputInfo.builder()...addIterator(cfg)
- */
- Collection<IteratorSetting> getIterators();
-
- /**
- * @return the SamplerConfiguration set using InputInfo.builder()...samplerConfiguration(cfg)
- */
- Optional<SamplerConfiguration> getSamplerConfig();
-
- /**
- * @return the Execution Hints set using InputInfo.builder()...executionHints(hints)
- */
- Map<String,String> getExecutionHints();
-
- /**
- * @return boolean if auto adjusting ranges or not
- */
- boolean isAutoAdjustRanges();
-
- /**
- * @return boolean if using scan isolation or not
- */
- boolean isScanIsolation();
-
- /**
- * @return boolean if using local iterators or not
- */
- boolean isLocalIterators();
-
- /**
- * @return boolean if using offline scan or not
- */
- boolean isOfflineScan();
-
- /**
- * @return boolean if using batch scanner or not
- */
- boolean isBatchScan();
-
- /**
- * Builder starting point for map reduce input format information.
- */
- static InputInfoBuilder.ClientParams builder() {
- return new InputInfoImpl.InputInfoBuilderImpl();
- }
-
- /**
- * Required build values to be set.
- *
- * @since 2.0
- */
- interface InputInfoBuilder {
- /**
- * Required params for builder
- *
- * @since 2.0
- */
- interface ClientParams {
- /**
- * Set the connection information needed to communicate with Accumulo in this job. ClientInfo
- * param can be created using {@link ClientInfo#from(Path)} or
- * {@link ClientInfo#from(Properties)}
- *
- * @param clientInfo
- * Accumulo connection information
- */
- TableParams clientInfo(ClientInfo clientInfo);
- }
-
- /**
- * Required params for builder
- *
- * @since 2.0
- */
- interface TableParams {
- /**
- * Sets the name of the input table, over which this job will scan.
- *
- * @param tableName
- * the table to use when the tablename is null in the write call
- */
- AuthsParams table(String tableName);
- }
-
- /**
- * Required params for builder
- *
- * @since 2.0
- */
- interface AuthsParams {
- /**
- * Sets the {@link Authorizations} used to scan. Must be a subset of the user's
- * authorizations. If none present use {@link Authorizations#EMPTY}
- *
- * @param auths
- * the user's authorizations
- */
- InputFormatOptions scanAuths(Authorizations auths);
- }
-
- /**
- * Options for batch scan
- *
- * @since 2.0
- */
- interface BatchScanOptions {
- /**
- * @return newly created {@link InputInfo}
- */
- InputInfo build();
- }
-
- /**
- * Options for scan
- *
- * @since 2.0
- */
- interface ScanOptions extends BatchScanOptions {
- /**
- * @see InputFormatOptions#scanIsolation()
- */
- ScanOptions scanIsolation();
-
- /**
- * @see InputFormatOptions#localIterators()
- */
- ScanOptions localIterators();
-
- /**
- * @see InputFormatOptions#offlineScan()
- */
- ScanOptions offlineScan();
- }
-
- /**
- * Optional values to set using fluent API
- *
- * @since 2.0
- */
- interface InputFormatOptions {
- /**
- * Sets the name of the classloader context on this scanner
- *
- * @param context
- * name of the classloader context
- */
- InputFormatOptions classLoaderContext(String context);
-
- /**
- * Sets the input ranges to scan for the single input table associated with this job.
- *
- * @param ranges
- * the ranges that will be mapped over
- * @see TableOperations#splitRangeByTablets(String, Range, int)
- */
- InputFormatOptions ranges(Collection<Range> ranges);
-
- /**
- * Restricts the columns that will be mapped over for this job for the default input table.
- *
- * @param fetchColumns
- * a collection of IteratorSetting.Column objects corresponding to column family and
- * column qualifier. If the column qualifier is null, the entire column family is
- * selected. An empty set is the default and is equivalent to scanning all columns.
- */
- InputFormatOptions fetchColumns(Collection<IteratorSetting.Column> fetchColumns);
-
- /**
- * Encode an iterator on the single input table for this job. It is safe to call this method
- * multiple times. If an iterator is added with the same name, it will be overridden.
- *
- * @param cfg
- * the configuration of the iterator
- */
- InputFormatOptions addIterator(IteratorSetting cfg);
-
- /**
- * Set these execution hints on scanners created for input splits. See
- * {@link ScannerBase#setExecutionHints(java.util.Map)}
- */
- InputFormatOptions executionHints(Map<String,String> hints);
-
- /**
- * Causes input format to read sample data. If sample data was created using a different
- * configuration or a tables sampler configuration changes while reading data, then the input
- * format will throw an error.
- *
- * @param samplerConfig
- * The sampler configuration that sample must have been created with inorder for
- * reading sample data to succeed.
- *
- * @see ScannerBase#setSamplerConfiguration(SamplerConfiguration)
- */
- InputFormatOptions samplerConfiguration(SamplerConfiguration samplerConfig);
-
- /**
- * Disables the automatic adjustment of ranges for this job. This feature merges overlapping
- * ranges, then splits them to align with tablet boundaries. Disabling this feature will cause
- * exactly one Map task to be created for each specified range. Disabling has no effect for
- * batch scans at it will always automatically adjust ranges.
- * <p>
- * By default, this feature is <b>enabled</b>.
- *
- * @see #ranges(Collection)
- */
- InputFormatOptions disableAutoAdjustRanges();
-
- /**
- * Enables the use of the {@link IsolatedScanner} in this job.
- * <p>
- * By default, this feature is <b>disabled</b>.
- */
- ScanOptions scanIsolation();
-
- /**
- * Enables the use of the {@link ClientSideIteratorScanner} in this job. This feature will
- * cause the iterator stack to be constructed within the Map task, rather than within the
- * Accumulo TServer. To use this feature, all classes needed for those iterators must be
- * available on the classpath for the task.
- * <p>
- * By default, this feature is <b>disabled</b>.
- */
- ScanOptions localIterators();
-
- /**
- * Enable reading offline tables. By default, this feature is disabled and only online tables
- * are scanned. This will make the map reduce job directly read the table's files. If the
- * table is not offline, then the job will fail. If the table comes online during the map
- * reduce job, it is likely that the job will fail.
- * <p>
- * To use this option, the map reduce user will need access to read the Accumulo directory in
- * HDFS.
- * <p>
- * Reading the offline table will create the scan time iterator stack in the map process. So
- * any iterators that are configured for the table will need to be on the mapper's classpath.
- * <p>
- * One way to use this feature is to clone a table, take the clone offline, and use the clone
- * as the input table for a map reduce job. If you plan to map reduce over the data many
- * times, it may be better to the compact the table, clone it, take it offline, and use the
- * clone for all map reduce jobs. The reason to do this is that compaction will reduce each
- * tablet in the table to one file, and it is faster to read from one file.
- * <p>
- * There are two possible advantages to reading a tables file directly out of HDFS. First, you
- * may see better read performance. Second, it will support speculative execution better. When
- * reading an online table speculative execution can put more load on an already slow tablet
- * server.
- * <p>
- * By default, this feature is <b>disabled</b>.
- */
- ScanOptions offlineScan();
-
- /**
- * Enables the use of the {@link org.apache.accumulo.core.client.BatchScanner} in this job.
- * Using this feature will group Ranges by their source tablet, producing an InputSplit per
- * tablet rather than per Range. This batching helps to reduce overhead when querying a large
- * number of small ranges. (ex: when doing quad-tree decomposition for spatial queries)
- * <p>
- * In order to achieve good locality of InputSplits this option always clips the input Ranges
- * to tablet boundaries. This may result in one input Range contributing to several
- * InputSplits.
- * <p>
- * Note: calls to {@link #disableAutoAdjustRanges()} is ignored when BatchScan is enabled.
- * <p>
- * This configuration is incompatible with:
- * <ul>
- * <li>{@link #offlineScan()}</li>
- * <li>{@link #localIterators()}</li>
- * <li>{@link #scanIsolation()}</li>
- * </ul>
- * <p>
- * By default, this feature is <b>disabled</b>.
- */
- BatchScanOptions batchScan();
-
- /**
- * @return newly created {@link InputInfo}
- */
- InputInfo build();
- }
- }
-}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/OutputFormatBuilder.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/OutputFormatBuilder.java
new file mode 100644
index 0000000..e12d803
--- /dev/null
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/OutputFormatBuilder.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoop.mapreduce;
+
+import java.util.Properties;
+
+import org.apache.accumulo.core.client.ClientInfo;
+
+/**
+ * Builder for all the information needed for the Map Reduce job. Fluent API used by
+ * {@link AccumuloOutputFormat#configure()}
+ *
+ * @since 2.0
+ */
+public interface OutputFormatBuilder {
+
+ /**
+ * Required params for client
+ *
+ * @since 2.0
+ */
+ interface ClientParams<T> {
+ /**
+ * Set the connection information needed to communicate with Accumulo in this job. ClientInfo
+ * param can be created using {@link ClientInfo#from(Properties)}
+ *
+ * @param clientInfo
+ * Accumulo connection information
+ */
+ OutputOptions<T> clientInfo(ClientInfo clientInfo);
+ }
+
+ /**
+ * Builder options
+ *
+ * @since 2.0
+ */
+ interface OutputOptions<T> {
+ /**
+ * Sets the default table name to use if one emits a null in place of a table name for a given
+ * mutation. Table names can only be alpha-numeric and underscores.
+ *
+ * @param tableName
+ * the table to use when the tablename is null in the write call
+ */
+ OutputOptions<T> defaultTable(String tableName);
+
+ /**
+ * Enables the directive to create new tables, as necessary. Table names can only be
+ * alpha-numeric and underscores.
+ * <p>
+ * By default, this feature is <b>disabled</b>.
+ */
+ OutputOptions<T> createTables();
+
+ /**
+ * Enables the directive to use simulation mode for this job. In simulation mode, no output is
+ * produced. This is useful for testing.
+ * <p>
+ * By default, this feature is <b>disabled</b>.
+ */
+ OutputOptions<T> simulationMode();
+
+ /**
+ * Finish configuring, verify and serialize options into the Job or JobConf
+ */
+ void store(T j);
+ }
+
+}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/OutputInfo.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/OutputInfo.java
deleted file mode 100644
index 0ca7443..0000000
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/OutputInfo.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.hadoop.mapreduce;
-
-import java.nio.file.Path;
-import java.util.Optional;
-import java.util.Properties;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.ClientInfo;
-import org.apache.accumulo.hadoopImpl.mapreduce.OutputInfoImpl;
-import org.apache.hadoop.mapreduce.Job;
-
-/**
- * Object containing all the information needed for the Map Reduce job. This object is passed to
- * {@link AccumuloOutputFormat#setInfo(Job, OutputInfo)}. It uses a fluent API like so:
- *
- * <pre>
- * OutputInfo.builder().clientInfo(clientInfo).batchWriterOptions(bwConfig).build();
- * </pre>
- *
- * @since 2.0
- */
-public interface OutputInfo {
- /**
- * @return the client info set using OutputInfo.builder().clientInfo(info)
- */
- ClientInfo getClientInfo();
-
- /**
- * @return the BatchWriterConfig set using OutputInfo.builder()...batchWriterOptions(conf)
- */
- Optional<BatchWriterConfig> getBatchWriterOptions();
-
- /**
- * @return the default tame name set using OutputInfo.builder()...defaultTableName(name)
- */
- Optional<String> getDefaultTableName();
-
- /**
- * @return boolean if creating tables or not
- */
- boolean isCreateTables();
-
- /**
- * @return boolean if running simulation mode or not
- */
- boolean isSimulationMode();
-
- /**
- * @return builder for creating a {@link OutputInfo}
- */
- public static OutputInfoBuilder.ClientParams builder() {
- return new OutputInfoImpl.OutputInfoBuilderImpl();
- }
-
- /**
- * Fluent API builder for OutputInfo
- *
- * @since 2.0
- */
- interface OutputInfoBuilder {
-
- /**
- * Required params for client
- *
- * @since 2.0
- */
- interface ClientParams {
- /**
- * Set the connection information needed to communicate with Accumulo in this job. ClientInfo
- * param can be created using {@link ClientInfo#from(Path)} or
- * {@link ClientInfo#from(Properties)}
- *
- * @param clientInfo
- * Accumulo connection information
- */
- OutputOptions clientInfo(ClientInfo clientInfo);
- }
-
- /**
- * Builder options
- *
- * @since 2.0
- */
- interface OutputOptions {
- /**
- * Sets the configuration for for the job's {@link BatchWriter} instances. If not set, a new
- * {@link BatchWriterConfig}, with sensible built-in defaults is used. Setting the
- * configuration multiple times overwrites any previous configuration.
- *
- * @param bwConfig
- * the configuration for the {@link BatchWriter}
- */
- OutputOptions batchWriterOptions(BatchWriterConfig bwConfig);
-
- /**
- * Sets the default table name to use if one emits a null in place of a table name for a given
- * mutation. Table names can only be alpha-numeric and underscores.
- *
- * @param tableName
- * the table to use when the tablename is null in the write call
- */
- OutputOptions defaultTableName(String tableName);
-
- /**
- * Enables the directive to create new tables, as necessary. Table names can only be
- * alpha-numeric and underscores.
- * <p>
- * By default, this feature is <b>disabled</b>.
- */
- OutputOptions enableCreateTables();
-
- /**
- * Enables the directive to use simulation mode for this job. In simulation mode, no output is
- * produced. This is useful for testing.
- * <p>
- * By default, this feature is <b>disabled</b>.
- */
- OutputOptions enableSimulationMode();
-
- /**
- * @return newly created {@link OutputInfo}
- */
- OutputInfo build();
- }
- }
-}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloOutputFormatImpl.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloOutputFormatImpl.java
index 0f52588..49890ff 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloOutputFormatImpl.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloOutputFormatImpl.java
@@ -27,7 +27,6 @@ import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.ClientInfo;
import org.apache.accumulo.core.client.MultiTableBatchWriter;
import org.apache.accumulo.core.client.MutationsRejectedException;
@@ -132,34 +131,6 @@ public class AccumuloOutputFormatImpl {
}
/**
- * Sets the configuration for for the job's {@link BatchWriter} instances. If not set, a new
- * {@link BatchWriterConfig}, with sensible built-in defaults is used. Setting the configuration
- * multiple times overwrites any previous configuration.
- *
- * @param job
- * the Hadoop job instance to be configured
- * @param bwConfig
- * the configuration for the {@link BatchWriter}
- * @since 1.5.0
- */
- public static void setBatchWriterOptions(JobConf job, BatchWriterConfig bwConfig) {
- OutputConfigurator.setBatchWriterOptions(CLASS, job, bwConfig);
- }
-
- /**
- * Gets the {@link BatchWriterConfig} settings.
- *
- * @param job
- * the Hadoop context for the configured job
- * @return the configuration object
- * @since 1.5.0
- * @see #setBatchWriterOptions(JobConf, BatchWriterConfig)
- */
- public static BatchWriterConfig getBatchWriterOptions(JobConf job) {
- return OutputConfigurator.getBatchWriterOptions(CLASS, job);
- }
-
- /**
* Sets the directive to create new tables, as necessary. Table names can only be alpha-numeric
* and underscores.
*
@@ -249,7 +220,7 @@ public class AccumuloOutputFormatImpl {
if (!simulate) {
this.client = Accumulo.newClient().from(getClientInfo(job)).build();
- mtbw = client.createMultiTableBatchWriter(getBatchWriterOptions(job));
+ mtbw = client.createMultiTableBatchWriter();
}
}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloOutputFormatImpl.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloOutputFormatImpl.java
index c51efa6..696f7f3 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloOutputFormatImpl.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloOutputFormatImpl.java
@@ -27,7 +27,6 @@ import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.ClientInfo;
import org.apache.accumulo.core.client.MultiTableBatchWriter;
import org.apache.accumulo.core.client.MutationsRejectedException;
@@ -134,34 +133,6 @@ public class AccumuloOutputFormatImpl {
}
/**
- * Sets the configuration for for the job's {@link BatchWriter} instances. If not set, a new
- * {@link BatchWriterConfig}, with sensible built-in defaults is used. Setting the configuration
- * multiple times overwrites any previous configuration.
- *
- * @param job
- * the Hadoop job instance to be configured
- * @param bwConfig
- * the configuration for the {@link BatchWriter}
- * @since 1.5.0
- */
- public static void setBatchWriterOptions(Job job, BatchWriterConfig bwConfig) {
- OutputConfigurator.setBatchWriterOptions(CLASS, job.getConfiguration(), bwConfig);
- }
-
- /**
- * Gets the {@link BatchWriterConfig} settings.
- *
- * @param context
- * the Hadoop context for the configured job
- * @return the configuration object
- * @since 1.5.0
- * @see #setBatchWriterOptions(Job, BatchWriterConfig)
- */
- public static BatchWriterConfig getBatchWriterOptions(JobContext context) {
- return OutputConfigurator.getBatchWriterOptions(CLASS, context.getConfiguration());
- }
-
- /**
* Sets the directive to create new tables, as necessary. Table names can only be alpha-numeric
* and underscores.
*
@@ -252,7 +223,7 @@ public class AccumuloOutputFormatImpl {
if (!simulate) {
this.client = Accumulo.newClient().from(getClientInfo(context)).build();
- mtbw = client.createMultiTableBatchWriter(getBatchWriterOptions(context));
+ mtbw = client.createMultiTableBatchWriter();
}
}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/FileOutputFormatBuilderImpl.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/FileOutputFormatBuilderImpl.java
new file mode 100644
index 0000000..5e06e1f
--- /dev/null
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/FileOutputFormatBuilderImpl.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoopImpl.mapreduce;
+
+import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setCompressionType;
+import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setDataBlockSize;
+import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setFileBlockSize;
+import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setIndexBlockSize;
+import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setReplication;
+import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setSampler;
+import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setSummarizers;
+import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.accumulo.hadoop.mapreduce.FileOutputFormatBuilder;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+
+public class FileOutputFormatBuilderImpl<T> implements FileOutputFormatBuilder,
+ FileOutputFormatBuilder.PathParams<T>, FileOutputFormatBuilder.OutputOptions<T> {
+
+ Path outputPath;
+ Optional<String> comp = Optional.empty();
+ Optional<Long> dataBlockSize = Optional.empty();
+ Optional<Long> fileBlockSize = Optional.empty();
+ Optional<Long> indexBlockSize = Optional.empty();
+ Optional<Integer> replication = Optional.empty();
+ Optional<SamplerConfiguration> sampler = Optional.empty();
+ Collection<SummarizerConfiguration> summarizers = Collections.emptySet();
+
+ @Override
+ public OutputOptions<T> outputPath(Path path) {
+ this.outputPath = Objects.requireNonNull(path);
+ return this;
+ }
+
+ @Override
+ public OutputOptions<T> compression(String compressionType) {
+ this.comp = Optional.of(compressionType);
+ return this;
+ }
+
+ @Override
+ public OutputOptions<T> dataBlockSize(long dataBlockSize) {
+ this.dataBlockSize = Optional.of(dataBlockSize);
+ return this;
+ }
+
+ @Override
+ public OutputOptions<T> fileBlockSize(long fileBlockSize) {
+ this.fileBlockSize = Optional.of(fileBlockSize);
+ return this;
+ }
+
+ @Override
+ public OutputOptions<T> indexBlockSize(long indexBlockSize) {
+ this.indexBlockSize = Optional.of(indexBlockSize);
+ return this;
+ }
+
+ @Override
+ public OutputOptions<T> replication(int replication) {
+ this.replication = Optional.of(replication);
+ return this;
+ }
+
+ @Override
+ public OutputOptions<T> sampler(SamplerConfiguration samplerConfig) {
+ this.sampler = Optional.of(samplerConfig);
+ return this;
+ }
+
+ @Override
+ public OutputOptions<T> summarizers(SummarizerConfiguration... summarizerConfigs) {
+ this.summarizers = Arrays.asList(Objects.requireNonNull(summarizerConfigs));
+ return this;
+ }
+
+ @Override
+ public void store(T j) {
+ if (j instanceof Job) {
+ store((Job) j);
+ } else if (j instanceof JobConf) {
+ store((JobConf) j);
+ } else {
+ throw new IllegalArgumentException("Unexpected type " + j.getClass().getName());
+ }
+ }
+
+ private void store(Job job) {
+ setOutputPath(job, outputPath);
+ if (comp.isPresent())
+ setCompressionType(job, comp.get());
+ if (dataBlockSize.isPresent())
+ setDataBlockSize(job, dataBlockSize.get());
+ if (fileBlockSize.isPresent())
+ setFileBlockSize(job, fileBlockSize.get());
+ if (indexBlockSize.isPresent())
+ setIndexBlockSize(job, indexBlockSize.get());
+ if (replication.isPresent())
+ setReplication(job, replication.get());
+ if (sampler.isPresent())
+ setSampler(job, sampler.get());
+ if (summarizers.size() > 0)
+ setSummarizers(job, summarizers.toArray(new SummarizerConfiguration[0]));
+ }
+
+ private void store(JobConf job) {
+ org.apache.hadoop.mapred.FileOutputFormat.setOutputPath(job, outputPath);
+ if (comp.isPresent())
+ org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setCompressionType(job,
+ comp.get());
+ if (dataBlockSize.isPresent())
+ org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setDataBlockSize(job,
+ dataBlockSize.get());
+ if (fileBlockSize.isPresent())
+ org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setFileBlockSize(job,
+ fileBlockSize.get());
+ if (indexBlockSize.isPresent())
+ org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setIndexBlockSize(job,
+ indexBlockSize.get());
+ if (replication.isPresent())
+ org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setReplication(job,
+ replication.get());
+ if (sampler.isPresent())
+ org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setSampler(job,
+ sampler.get());
+ if (summarizers.size() > 0)
+ org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setSummarizers(job,
+ summarizers.toArray(new SummarizerConfiguration[0]));
+ }
+
+}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/FileOutputInfoImpl.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/FileOutputInfoImpl.java
deleted file mode 100644
index 64d300e..0000000
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/FileOutputInfoImpl.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.hadoopImpl.mapreduce;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Objects;
-import java.util.Optional;
-
-import org.apache.accumulo.core.client.sample.SamplerConfiguration;
-import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
-import org.apache.accumulo.hadoop.mapreduce.FileOutputInfo;
-import org.apache.hadoop.fs.Path;
-
-public class FileOutputInfoImpl implements FileOutputInfo {
- Path outputPath;
- Optional<String> comp;
- Optional<Long> dataBlockSize;
- Optional<Long> fileBlockSize;
- Optional<Long> indexBlockSize;
- Optional<Integer> replication;
- Optional<SamplerConfiguration> sampler;
- Collection<SummarizerConfiguration> summarizers;
-
- public FileOutputInfoImpl(Path outputPath, Optional<String> comp, Optional<Long> dataBlockSize,
- Optional<Long> fileBlockSize, Optional<Long> indexBlockSize, Optional<Integer> replication,
- Optional<SamplerConfiguration> sampler, Collection<SummarizerConfiguration> summarizers) {
- this.outputPath = outputPath;
- this.comp = comp;
- this.dataBlockSize = dataBlockSize;
- this.fileBlockSize = fileBlockSize;
- this.indexBlockSize = indexBlockSize;
- this.replication = replication;
- this.sampler = sampler;
- this.summarizers = summarizers;
- }
-
- @Override
- public Path getOutputPath() {
- return outputPath;
- }
-
- @Override
- public Optional<String> getCompressionType() {
- return comp;
- }
-
- @Override
- public Optional<Long> getDataBlockSize() {
- return dataBlockSize;
- }
-
- @Override
- public Optional<Long> getFileBlockSize() {
- return fileBlockSize;
- }
-
- @Override
- public Optional<Long> getIndexBlockSize() {
- return indexBlockSize;
- }
-
- @Override
- public Optional<Integer> getReplication() {
- return replication;
- }
-
- @Override
- public Optional<SamplerConfiguration> getSampler() {
- return sampler;
- }
-
- @Override
- public Collection<SummarizerConfiguration> getSummarizers() {
- return summarizers;
- }
-
- public static class FileOutputInfoBuilderImpl implements FileOutputInfoBuilder,
- FileOutputInfoBuilder.PathParams, FileOutputInfoBuilder.OutputOptions {
- Path outputPath;
- Optional<String> comp = Optional.empty();
- Optional<Long> dataBlockSize = Optional.empty();
- Optional<Long> fileBlockSize = Optional.empty();
- Optional<Long> indexBlockSize = Optional.empty();
- Optional<Integer> replication = Optional.empty();
- Optional<SamplerConfiguration> sampler = Optional.empty();
- Collection<SummarizerConfiguration> summarizers = Collections.emptySet();
-
- @Override
- public OutputOptions outputPath(Path path) {
- this.outputPath = Objects.requireNonNull(path);
- ;
- return this;
- }
-
- @Override
- public OutputOptions compressionType(String compressionType) {
- this.comp = Optional.of(compressionType);
- return this;
- }
-
- @Override
- public OutputOptions dataBlockSize(long dataBlockSize) {
- this.dataBlockSize = Optional.of(dataBlockSize);
- return this;
- }
-
- @Override
- public OutputOptions fileBlockSize(long fileBlockSize) {
- this.fileBlockSize = Optional.of(fileBlockSize);
- return this;
- }
-
- @Override
- public OutputOptions indexBlockSize(long indexBlockSize) {
- this.indexBlockSize = Optional.of(indexBlockSize);
- return this;
- }
-
- @Override
- public OutputOptions replication(int replication) {
- this.replication = Optional.of(replication);
- return this;
- }
-
- @Override
- public OutputOptions sampler(SamplerConfiguration samplerConfig) {
- this.sampler = Optional.of(samplerConfig);
- return this;
- }
-
- @Override
- public OutputOptions summarizers(SummarizerConfiguration... summarizerConfigs) {
- this.summarizers = Arrays.asList(Objects.requireNonNull(summarizerConfigs));
- return this;
- }
-
- @Override
- public FileOutputInfo build() {
- return new FileOutputInfoImpl(outputPath, comp, dataBlockSize, fileBlockSize, indexBlockSize,
- replication, sampler, summarizers);
- }
- }
-}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputFormatBuilderImpl.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputFormatBuilderImpl.java
new file mode 100644
index 0000000..79deca6
--- /dev/null
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputFormatBuilderImpl.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoopImpl.mapreduce;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.apache.accumulo.core.client.ClientInfo;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.hadoop.mapreduce.InputFormatBuilder;
+import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+public class InputFormatBuilderImpl<T> implements InputFormatBuilder,
+ InputFormatBuilder.ClientParams<T>, InputFormatBuilder.TableParams<T>,
+ InputFormatBuilder.AuthsParams<T>, InputFormatBuilder.InputFormatOptions<T>,
+ InputFormatBuilder.ScanOptions<T>, InputFormatBuilder.BatchScanOptions<T> {
+
+ Class<?> callingClass;
+ String tableName;
+ ClientInfo clientInfo;
+ Authorizations scanAuths;
+
+ Optional<String> context = Optional.empty();
+ Collection<Range> ranges = Collections.emptyList();
+ Collection<IteratorSetting.Column> fetchColumns = Collections.emptyList();
+ Map<String,IteratorSetting> iterators = Collections.emptyMap();
+ Optional<SamplerConfiguration> samplerConfig = Optional.empty();
+ Map<String,String> hints = Collections.emptyMap();
+ BuilderBooleans bools = new BuilderBooleans();
+
+ public InputFormatBuilderImpl(Class<?> callingClass) {
+ this.callingClass = callingClass;
+ }
+
+ @Override
+ public InputFormatBuilder.TableParams<T> clientInfo(ClientInfo clientInfo) {
+ this.clientInfo = Objects.requireNonNull(clientInfo, "ClientInfo must not be null");
+ return this;
+ }
+
+ @Override
+ public InputFormatBuilder.AuthsParams<T> table(String tableName) {
+ this.tableName = Objects.requireNonNull(tableName, "Table name must not be null");
+ return this;
+ }
+
+ @Override
+ public InputFormatBuilder.InputFormatOptions<T> auths(Authorizations auths) {
+ this.scanAuths = Objects.requireNonNull(auths, "Authorizations must not be null");
+ return this;
+ }
+
+ @Override
+ public InputFormatBuilder.InputFormatOptions<T> classLoaderContext(String context) {
+ this.context = Optional.of(context);
+ return this;
+ }
+
+ @Override
+ public InputFormatBuilder.InputFormatOptions<T> ranges(Collection<Range> ranges) {
+ this.ranges = ImmutableList
+ .copyOf(Objects.requireNonNull(ranges, "Collection of ranges is null"));
+ if (this.ranges.size() == 0)
+ throw new IllegalArgumentException("Specified collection of ranges is empty.");
+ return this;
+ }
+
+ @Override
+ public InputFormatBuilder.InputFormatOptions<T> fetchColumns(
+ Collection<IteratorSetting.Column> fetchColumns) {
+ this.fetchColumns = ImmutableList
+ .copyOf(Objects.requireNonNull(fetchColumns, "Collection of fetch columns is null"));
+ if (this.fetchColumns.size() == 0)
+ throw new IllegalArgumentException("Specified collection of fetch columns is empty.");
+ return this;
+ }
+
+ @Override
+ public InputFormatBuilder.InputFormatOptions<T> addIterator(IteratorSetting cfg) {
+ // store iterators by name to prevent duplicates
+ Objects.requireNonNull(cfg, "IteratorSetting must not be null.");
+ if (this.iterators.size() == 0)
+ this.iterators = new LinkedHashMap<>();
+ this.iterators.put(cfg.getName(), cfg);
+ return this;
+ }
+
+ @Override
+ public InputFormatBuilder.InputFormatOptions<T> executionHints(Map<String,String> hints) {
+ this.hints = ImmutableMap
+ .copyOf(Objects.requireNonNull(hints, "Map of execution hints must not be null."));
+ if (hints.size() == 0)
+ throw new IllegalArgumentException("Specified map of execution hints is empty.");
+ return this;
+ }
+
+ @Override
+ public InputFormatBuilder.InputFormatOptions<T> samplerConfiguration(
+ SamplerConfiguration samplerConfig) {
+ this.samplerConfig = Optional.of(samplerConfig);
+ return this;
+ }
+
+ @Override
+ public InputFormatOptions<T> disableAutoAdjustRanges() {
+ bools.autoAdjustRanges = false;
+ return this;
+ }
+
+ @Override
+ public ScanOptions<T> scanIsolation() {
+ bools.scanIsolation = true;
+ return this;
+ }
+
+ @Override
+ public ScanOptions<T> localIterators() {
+ bools.localIters = true;
+ return this;
+ }
+
+ @Override
+ public ScanOptions<T> offlineScan() {
+ bools.offlineScan = true;
+ return this;
+ }
+
+ @Override
+ public BatchScanOptions<T> batchScan() {
+ bools.batchScan = true;
+ bools.autoAdjustRanges = true;
+ return this;
+ }
+
+ @Override
+ public void store(T j) {
+ if (j instanceof Job) {
+ store((Job) j);
+ } else if (j instanceof JobConf) {
+ store((JobConf) j);
+ } else {
+ throw new IllegalArgumentException("Unexpected type " + j.getClass().getName());
+ }
+ }
+
+ /**
+ * Final builder method for mapreduce configuration
+ */
+ private void store(Job job) {
+ // TODO validate params are set correctly, possibly call/modify
+ // AbstractInputFormat.validateOptions()
+ AbstractInputFormat.setClientInfo(job, clientInfo);
+ AbstractInputFormat.setScanAuthorizations(job, scanAuths);
+ InputFormatBase.setInputTableName(job, tableName);
+
+ // all optional values
+ if (context.isPresent())
+ AbstractInputFormat.setClassLoaderContext(job, context.get());
+ if (ranges.size() > 0)
+ InputFormatBase.setRanges(job, ranges);
+ if (iterators.size() > 0)
+ InputConfigurator.writeIteratorsToConf(callingClass, job.getConfiguration(),
+ iterators.values());
+ if (fetchColumns.size() > 0)
+ InputConfigurator.fetchColumns(callingClass, job.getConfiguration(), fetchColumns);
+ if (samplerConfig.isPresent())
+ InputFormatBase.setSamplerConfiguration(job, samplerConfig.get());
+ if (hints.size() > 0)
+ InputFormatBase.setExecutionHints(job, hints);
+ InputFormatBase.setAutoAdjustRanges(job, bools.autoAdjustRanges);
+ InputFormatBase.setScanIsolation(job, bools.scanIsolation);
+ InputFormatBase.setLocalIterators(job, bools.localIters);
+ InputFormatBase.setOfflineTableScan(job, bools.offlineScan);
+ InputFormatBase.setBatchScan(job, bools.batchScan);
+ }
+
+ /**
+ * Final builder method for legacy mapred configuration
+ */
+ private void store(JobConf jobConf) {
+ // TODO validate params are set correctly, possibly call/modify
+ // AbstractInputFormat.validateOptions()
+ org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setClientInfo(jobConf, clientInfo);
+ org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setScanAuthorizations(jobConf,
+ scanAuths);
+ org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setInputTableName(jobConf, tableName);
+
+ // all optional values
+ if (context.isPresent())
+ org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setClassLoaderContext(jobConf,
+ context.get());
+ if (ranges.size() > 0)
+ org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setRanges(jobConf, ranges);
+ if (iterators.size() > 0)
+ InputConfigurator.writeIteratorsToConf(callingClass, jobConf, iterators.values());
+ if (fetchColumns.size() > 0)
+ InputConfigurator.fetchColumns(callingClass, jobConf, fetchColumns);
+ if (samplerConfig.isPresent())
+ org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setSamplerConfiguration(jobConf,
+ samplerConfig.get());
+ if (hints.size() > 0)
+ org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setExecutionHints(jobConf, hints);
+ org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setAutoAdjustRanges(jobConf,
+ bools.autoAdjustRanges);
+ org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setScanIsolation(jobConf,
+ bools.scanIsolation);
+ org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setLocalIterators(jobConf,
+ bools.localIters);
+ org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setOfflineTableScan(jobConf,
+ bools.offlineScan);
+ org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setBatchScan(jobConf, bools.batchScan);
+ }
+
+ private static class BuilderBooleans {
+ boolean autoAdjustRanges = true;
+ boolean scanIsolation = false;
+ boolean offlineScan = false;
+ boolean localIters = false;
+ boolean batchScan = false;
+ }
+}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputInfoImpl.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputInfoImpl.java
deleted file mode 100644
index 878148a..0000000
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputInfoImpl.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.hadoopImpl.mapreduce;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-
-import org.apache.accumulo.core.client.ClientInfo;
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.sample.SamplerConfiguration;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.hadoop.mapreduce.InputInfo;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-
-public class InputInfoImpl implements InputInfo {
- String tableName;
- ClientInfo clientInfo;
- Authorizations scanAuths;
-
- // optional values
- Optional<String> context;
- Collection<Range> ranges;
- Collection<IteratorSetting.Column> fetchColumns;
- Map<String,IteratorSetting> iterators;
- Optional<SamplerConfiguration> samplerConfig;
- Map<String,String> hints;
- InputInfoBooleans bools;
-
- public InputInfoImpl(String tableName, ClientInfo clientInfo, Authorizations scanAuths,
- Optional<String> context, Collection<Range> ranges,
- Collection<IteratorSetting.Column> fetchColumns, Map<String,IteratorSetting> iterators,
- Optional<SamplerConfiguration> samplerConfig, Map<String,String> hints,
- InputInfoBooleans bools) {
- this.tableName = tableName;
- this.clientInfo = clientInfo;
- this.scanAuths = scanAuths;
- this.context = context;
- this.ranges = ranges;
- this.fetchColumns = fetchColumns;
- this.iterators = iterators;
- this.samplerConfig = samplerConfig;
- this.hints = hints;
- this.bools = bools;
- }
-
- @Override
- public String getTableName() {
- return tableName;
- }
-
- @Override
- public ClientInfo getClientInfo() {
- return clientInfo;
- }
-
- public Authorizations getScanAuths() {
- return scanAuths;
- }
-
- @Override
- public Optional<String> getContext() {
- return context;
- }
-
- @Override
- public Collection<Range> getRanges() {
- return ranges;
- }
-
- @Override
- public Collection<IteratorSetting.Column> getFetchColumns() {
- return fetchColumns;
- }
-
- @Override
- public Collection<IteratorSetting> getIterators() {
- return iterators.values();
- }
-
- @Override
- public Optional<SamplerConfiguration> getSamplerConfig() {
- return samplerConfig;
- }
-
- @Override
- public Map<String,String> getExecutionHints() {
- return hints;
- }
-
- @Override
- public boolean isAutoAdjustRanges() {
- return bools.autoAdjustRanges;
- }
-
- @Override
- public boolean isScanIsolation() {
- return bools.scanIsolation;
- }
-
- @Override
- public boolean isLocalIterators() {
- return bools.localIters;
- }
-
- @Override
- public boolean isOfflineScan() {
- return bools.offlineScan;
- }
-
- @Override
- public boolean isBatchScan() {
- return bools.batchScan;
- }
-
- private static class InputInfoBooleans {
- boolean autoAdjustRanges = true;
- boolean scanIsolation = false;
- boolean offlineScan = false;
- boolean localIters = false;
- boolean batchScan = false;
- }
-
- public static class InputInfoBuilderImpl
- implements InputInfoBuilder, InputInfoBuilder.ClientParams, InputInfoBuilder.TableParams,
- InputInfoBuilder.AuthsParams, InputInfoBuilder.InputFormatOptions,
- InputInfoBuilder.ScanOptions, InputInfoBuilder.BatchScanOptions {
-
- String tableName;
- ClientInfo clientInfo;
- Authorizations scanAuths;
-
- Optional<String> context = Optional.empty();
- Collection<Range> ranges = Collections.emptyList();
- Collection<IteratorSetting.Column> fetchColumns = Collections.emptyList();
- Map<String,IteratorSetting> iterators = Collections.emptyMap();
- Optional<SamplerConfiguration> samplerConfig = Optional.empty();
- Map<String,String> hints = Collections.emptyMap();
- InputInfoBooleans bools = new InputInfoBooleans();
-
- @Override
- public InputInfoBuilder.TableParams clientInfo(ClientInfo clientInfo) {
- this.clientInfo = Objects.requireNonNull(clientInfo, "ClientInfo must not be null");
- return this;
- }
-
- @Override
- public InputInfoBuilder.AuthsParams table(String tableName) {
- this.tableName = Objects.requireNonNull(tableName, "Table name must not be null");
- return this;
- }
-
- @Override
- public InputInfoBuilder.InputFormatOptions scanAuths(Authorizations auths) {
- this.scanAuths = Objects.requireNonNull(auths, "Authorizations must not be null");
- return this;
- }
-
- @Override
- public InputInfoBuilder.InputFormatOptions classLoaderContext(String context) {
- this.context = Optional.of(context);
- return this;
- }
-
- @Override
- public InputInfoBuilder.InputFormatOptions ranges(Collection<Range> ranges) {
- this.ranges = ImmutableList
- .copyOf(Objects.requireNonNull(ranges, "Collection of ranges is null"));
- if (this.ranges.size() == 0)
- throw new IllegalArgumentException("Specified collection of ranges is empty.");
- return this;
- }
-
- @Override
- public InputInfoBuilder.InputFormatOptions fetchColumns(
- Collection<IteratorSetting.Column> fetchColumns) {
- this.fetchColumns = ImmutableList
- .copyOf(Objects.requireNonNull(fetchColumns, "Collection of fetch columns is null"));
- if (this.fetchColumns.size() == 0)
- throw new IllegalArgumentException("Specified collection of fetch columns is empty.");
- return this;
- }
-
- @Override
- public InputInfoBuilder.InputFormatOptions addIterator(IteratorSetting cfg) {
- // store iterators by name to prevent duplicates
- Objects.requireNonNull(cfg, "IteratorSetting must not be null.");
- if (this.iterators.size() == 0)
- this.iterators = new LinkedHashMap<>();
- this.iterators.put(cfg.getName(), cfg);
- return this;
- }
-
- @Override
- public InputInfoBuilder.InputFormatOptions executionHints(Map<String,String> hints) {
- this.hints = ImmutableMap
- .copyOf(Objects.requireNonNull(hints, "Map of execution hints must not be null."));
- if (hints.size() == 0)
- throw new IllegalArgumentException("Specified map of execution hints is empty.");
- return this;
- }
-
- @Override
- public InputInfoBuilder.InputFormatOptions samplerConfiguration(
- SamplerConfiguration samplerConfig) {
- this.samplerConfig = Optional.of(samplerConfig);
- return this;
- }
-
- @Override
- public InputFormatOptions disableAutoAdjustRanges() {
- bools.autoAdjustRanges = false;
- return this;
- }
-
- @Override
- public ScanOptions scanIsolation() {
- bools.scanIsolation = true;
- return this;
- }
-
- @Override
- public ScanOptions localIterators() {
- bools.localIters = true;
- return this;
- }
-
- @Override
- public ScanOptions offlineScan() {
- bools.offlineScan = true;
- return this;
- }
-
- @Override
- public BatchScanOptions batchScan() {
- bools.batchScan = true;
- bools.autoAdjustRanges = true;
- return this;
- }
-
- @Override
- public InputInfo build() {
- return new InputInfoImpl(tableName, clientInfo, scanAuths, context, ranges, fetchColumns,
- iterators, samplerConfig, hints, bools);
- }
- }
-}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/OutputFormatBuilderImpl.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/OutputFormatBuilderImpl.java
new file mode 100644
index 0000000..0e9a1ca
--- /dev/null
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/OutputFormatBuilderImpl.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoopImpl.mapreduce;
+
+import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloOutputFormatImpl.setClientInfo;
+import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloOutputFormatImpl.setCreateTables;
+import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloOutputFormatImpl.setDefaultTableName;
+import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloOutputFormatImpl.setSimulationMode;
+
+import java.util.Objects;
+import java.util.Optional;
+
+import org.apache.accumulo.core.client.ClientInfo;
+import org.apache.accumulo.hadoop.mapreduce.OutputFormatBuilder;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+
+public class OutputFormatBuilderImpl<T>
+ implements OutputFormatBuilder.ClientParams<T>, OutputFormatBuilder.OutputOptions<T> {
+ ClientInfo clientInfo;
+
+ // optional values
+ Optional<String> defaultTableName = Optional.empty();
+ boolean createTables = false;
+ boolean simulationMode = false;
+
+ @Override
+ public OutputFormatBuilder.OutputOptions<T> clientInfo(ClientInfo clientInfo) {
+ this.clientInfo = Objects.requireNonNull(clientInfo, "ClientInfo must not be null");
+ return this;
+ }
+
+ @Override
+ public OutputFormatBuilder.OutputOptions<T> defaultTable(String tableName) {
+ this.defaultTableName = Optional.of(tableName);
+ return this;
+ }
+
+ @Override
+ public OutputFormatBuilder.OutputOptions<T> createTables() {
+ this.createTables = true;
+ return this;
+ }
+
+ @Override
+ public OutputFormatBuilder.OutputOptions<T> simulationMode() {
+ this.simulationMode = true;
+ return this;
+ }
+
+ @Override
+ public void store(T j) {
+ if (j instanceof Job) {
+ store((Job) j);
+ } else if (j instanceof JobConf) {
+ store((JobConf) j);
+ } else {
+ throw new IllegalArgumentException("Unexpected type " + j.getClass().getName());
+ }
+ }
+
+ private void store(Job job) {
+ setClientInfo(job, clientInfo);
+ if (defaultTableName.isPresent())
+ setDefaultTableName(job, defaultTableName.get());
+ setCreateTables(job, createTables);
+ setSimulationMode(job, simulationMode);
+ }
+
+ private void store(JobConf jobConf) {
+ org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl.setClientInfo(jobConf,
+ clientInfo);
+ if (defaultTableName.isPresent())
+ org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl.setDefaultTableName(jobConf,
+ defaultTableName.get());
+ org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl.setCreateTables(jobConf,
+ createTables);
+ org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl.setSimulationMode(jobConf,
+ simulationMode);
+
+ }
+
+}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/OutputInfoImpl.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/OutputInfoImpl.java
deleted file mode 100644
index 27c94d1..0000000
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/OutputInfoImpl.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.hadoopImpl.mapreduce;
-
-import java.util.Objects;
-import java.util.Optional;
-
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.ClientInfo;
-import org.apache.accumulo.hadoop.mapreduce.OutputInfo;
-
-public class OutputInfoImpl implements OutputInfo {
- ClientInfo clientInfo;
-
- // optional values
- Optional<String> defaultTableName;
- Optional<BatchWriterConfig> bwConfig;
- boolean createTables;
- boolean simulationMode;
-
- public OutputInfoImpl(ClientInfo ci, Optional<String> defaultTableName,
- Optional<BatchWriterConfig> bwConfig, boolean createTables, boolean simulationMode) {
- this.clientInfo = ci;
- this.defaultTableName = defaultTableName;
- this.bwConfig = bwConfig;
- this.createTables = createTables;
- this.simulationMode = simulationMode;
- }
-
- @Override
- public ClientInfo getClientInfo() {
- return clientInfo;
- }
-
- @Override
- public Optional<BatchWriterConfig> getBatchWriterOptions() {
- return bwConfig;
- }
-
- @Override
- public Optional<String> getDefaultTableName() {
- return defaultTableName;
- }
-
- @Override
- public boolean isCreateTables() {
- return createTables;
- }
-
- @Override
- public boolean isSimulationMode() {
- return simulationMode;
- }
-
- public static class OutputInfoBuilderImpl implements OutputInfoBuilder,
- OutputInfoBuilder.ClientParams, OutputInfoBuilder.OutputOptions {
- ClientInfo clientInfo;
-
- // optional values
- Optional<String> defaultTableName = Optional.empty();
- Optional<BatchWriterConfig> bwConfig = Optional.empty();
- boolean createTables = false;
- boolean simulationMode = false;
-
- @Override
- public OutputOptions clientInfo(ClientInfo clientInfo) {
- this.clientInfo = Objects.requireNonNull(clientInfo, "ClientInfo must not be null");
- return this;
- }
-
- @Override
- public OutputOptions batchWriterOptions(BatchWriterConfig bwConfig) {
- this.bwConfig = Optional.of(bwConfig);
- return this;
- }
-
- @Override
- public OutputOptions defaultTableName(String tableName) {
- this.defaultTableName = Optional.of(tableName);
- return this;
- }
-
- @Override
- public OutputOptions enableCreateTables() {
- this.createTables = true;
- return this;
- }
-
- @Override
- public OutputOptions enableSimulationMode() {
- this.simulationMode = true;
- return this;
- }
-
- @Override
- public OutputInfo build() {
- return new OutputInfoImpl(clientInfo, defaultTableName, bwConfig, createTables,
- simulationMode);
- }
- }
-}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnDefaultTable.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnDefaultTable.java
index 772edc1..6f4ff28 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnDefaultTable.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnDefaultTable.java
@@ -19,8 +19,6 @@ package org.apache.accumulo.hadoopImpl.mapreduce.lib;
import org.apache.accumulo.core.client.ClientInfo;
import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat;
-import org.apache.accumulo.hadoop.mapreduce.InputInfo;
-import org.apache.accumulo.hadoop.mapreduce.OutputInfo;
import org.apache.hadoop.mapreduce.Job;
import com.beust.jcommander.Parameter;
@@ -41,10 +39,9 @@ public class MapReduceClientOnDefaultTable extends MapReduceClientOpts {
public void setAccumuloConfigs(Job job) {
final String tableName = getTableName();
final ClientInfo info = getClientInfo();
- AccumuloInputFormat.setInfo(job,
- InputInfo.builder().clientInfo(info).table(tableName).scanAuths(auths).build());
- AccumuloOutputFormat.setInfo(job, OutputInfo.builder().clientInfo(info)
- .defaultTableName(tableName).enableCreateTables().build());
+ AccumuloInputFormat.configure().clientInfo(info).table(tableName).auths(auths).store(job);
+ AccumuloOutputFormat.configure().clientInfo(info).defaultTable(tableName).createTables()
+ .store(job);
}
}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnRequiredTable.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnRequiredTable.java
index e6c91db..76efb96 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnRequiredTable.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnRequiredTable.java
@@ -19,8 +19,6 @@ package org.apache.accumulo.hadoopImpl.mapreduce.lib;
import org.apache.accumulo.core.client.ClientInfo;
import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat;
-import org.apache.accumulo.hadoop.mapreduce.InputInfo;
-import org.apache.accumulo.hadoop.mapreduce.OutputInfo;
import org.apache.hadoop.mapreduce.Job;
import com.beust.jcommander.Parameter;
@@ -34,10 +32,10 @@ public class MapReduceClientOnRequiredTable extends MapReduceClientOpts {
public void setAccumuloConfigs(Job job) {
final String tableName = getTableName();
final ClientInfo info = getClientInfo();
- AccumuloInputFormat.setInfo(job,
- InputInfo.builder().clientInfo(info).table(tableName).scanAuths(auths).build());
- AccumuloOutputFormat.setInfo(job, OutputInfo.builder().clientInfo(info)
- .defaultTableName(tableName).enableCreateTables().build());
+ System.out.println("MIKE here is dahhhhhhhhhhhhhhhhhhh PRINCIPAL= " + info.getPrincipal());
+ AccumuloInputFormat.configure().clientInfo(info).table(tableName).auths(auths).store(job);
+ AccumuloOutputFormat.configure().clientInfo(info).defaultTable(tableName).createTables()
+ .store(job);
}
public String getTableName() {
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/OutputConfigurator.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/OutputConfigurator.java
index 12ce572..9f325d4 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/OutputConfigurator.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/OutputConfigurator.java
@@ -16,16 +16,19 @@
*/
package org.apache.accumulo.hadoopImpl.mapreduce.lib;
-import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.accumulo.core.conf.ClientProperty.BATCH_WRITER_DURABILITY;
+import static org.apache.accumulo.core.conf.ClientProperty.BATCH_WRITER_MAX_LATENCY_SEC;
+import static org.apache.accumulo.core.conf.ClientProperty.BATCH_WRITER_MAX_MEMORY_BYTES;
+import static org.apache.accumulo.core.conf.ClientProperty.BATCH_WRITER_MAX_TIMEOUT_SEC;
+import static org.apache.accumulo.core.conf.ClientProperty.BATCH_WRITER_MAX_WRITE_THREADS;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ClientInfo;
+import org.apache.accumulo.core.clientImpl.DurabilityImpl;
import org.apache.hadoop.conf.Configuration;
/**
@@ -85,61 +88,30 @@ public class OutputConfigurator extends ConfiguratorBase {
}
/**
- * Sets the configuration for for the job's {@link BatchWriter} instances. If not set, a new
- * {@link BatchWriterConfig}, with sensible built-in defaults is used. Setting the configuration
- * multiple times overwrites any previous configuration.
- *
- * @param implementingClass
- * the class whose name will be used as a prefix for the property configuration key
- * @param conf
- * the Hadoop configuration object to configure
- * @param bwConfig
- * the configuration for the {@link BatchWriter}
- * @since 1.6.0
- */
- public static void setBatchWriterOptions(Class<?> implementingClass, Configuration conf,
- BatchWriterConfig bwConfig) {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- String serialized;
- try {
- bwConfig.write(new DataOutputStream(baos));
- serialized = new String(baos.toByteArray(), UTF_8);
- baos.close();
- } catch (IOException e) {
- throw new IllegalArgumentException(
- "unable to serialize " + BatchWriterConfig.class.getName());
- }
- conf.set(enumToConfKey(implementingClass, WriteOpts.BATCH_WRITER_CONFIG), serialized);
- }
-
- /**
- * Gets the {@link BatchWriterConfig} settings.
- *
- * @param implementingClass
- * the class whose name will be used as a prefix for the property configuration key
- * @param conf
- * the Hadoop configuration object to configure
- * @return the configuration object
- * @since 1.6.0
- * @see #setBatchWriterOptions(Class, Configuration, BatchWriterConfig)
+ * Gets the {@link BatchWriterConfig} settings that were stored with ClientInfo
*/
public static BatchWriterConfig getBatchWriterOptions(Class<?> implementingClass,
Configuration conf) {
- String serialized = conf.get(enumToConfKey(implementingClass, WriteOpts.BATCH_WRITER_CONFIG));
BatchWriterConfig bwConfig = new BatchWriterConfig();
- if (serialized == null || serialized.isEmpty()) {
- return bwConfig;
- } else {
- try {
- ByteArrayInputStream bais = new ByteArrayInputStream(serialized.getBytes(UTF_8));
- bwConfig.readFields(new DataInputStream(bais));
- bais.close();
- return bwConfig;
- } catch (IOException e) {
- throw new IllegalArgumentException(
- "unable to serialize " + BatchWriterConfig.class.getName());
- }
- }
+ ClientInfo info = getClientInfo(implementingClass, conf);
+ Properties props = info.getProperties();
+ String property = props.getProperty(BATCH_WRITER_DURABILITY.getKey());
+ if (property != null)
+ bwConfig.setDurability(DurabilityImpl.fromString(property));
+ property = props.getProperty(BATCH_WRITER_MAX_LATENCY_SEC.getKey());
+ if (property != null)
+ bwConfig.setMaxLatency(Long.parseLong(property), TimeUnit.MILLISECONDS);
+ property = props.getProperty(BATCH_WRITER_MAX_MEMORY_BYTES.getKey());
+ if (property != null)
+ bwConfig.setMaxMemory(Long.parseLong(property));
+ property = props.getProperty(BATCH_WRITER_MAX_TIMEOUT_SEC.getKey());
+ if (property != null)
+ bwConfig.setTimeout(Long.parseLong(property), TimeUnit.MILLISECONDS);
+ property = props.getProperty(BATCH_WRITER_MAX_WRITE_THREADS.getKey());
+ if (property != null)
+ bwConfig.setMaxWriteThreads(Integer.parseInt(property));
+
+ return bwConfig;
}
/**
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloFileOutputFormatIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloFileOutputFormatIT.java
new file mode 100644
index 0000000..e7bd8f7
--- /dev/null
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloFileOutputFormatIT.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.hadoop.its.mapred;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.sample.RowSampler;
+import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.clientImpl.mapreduce.lib.ConfiguratorBase;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.file.rfile.RFileOperations;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.hadoop.mapred.AccumuloFileOutputFormat;
+import org.apache.accumulo.hadoop.mapred.AccumuloInputFormat;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AccumuloFileOutputFormatIT extends AccumuloClusterHarness {
+ private static final Logger log = LoggerFactory.getLogger(AccumuloFileOutputFormatIT.class);
+ private static final int JOB_VISIBILITY_CACHE_SIZE = 3000;
+ private static final String PREFIX = AccumuloFileOutputFormatIT.class.getSimpleName();
+ private static final String BAD_TABLE = PREFIX + "_mapred_bad_table";
+ private static final String TEST_TABLE = PREFIX + "_mapred_test_table";
+ private static final String EMPTY_TABLE = PREFIX + "_mapred_empty_table";
+
+ private static AssertionError e1 = null;
+ private static AssertionError e2 = null;
+
+ private static final SamplerConfiguration SAMPLER_CONFIG = new SamplerConfiguration(
+ RowSampler.class.getName()).addOption("hasher", "murmur3_32").addOption("modulus", "3");
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder(
+ new File(System.getProperty("user.dir") + "/target"));
+
+ @Test
+ public void testEmptyWrite() throws Exception {
+ try (AccumuloClient c = getAccumuloClient()) {
+ c.tableOperations().create(EMPTY_TABLE);
+ handleWriteTests(false);
+ }
+ }
+
+ @Test
+ public void testRealWrite() throws Exception {
+ try (AccumuloClient c = getAccumuloClient()) {
+ c.tableOperations().create(TEST_TABLE);
+ BatchWriter bw = c.createBatchWriter(TEST_TABLE, new BatchWriterConfig());
+ Mutation m = new Mutation("Key");
+ m.put("", "", "");
+ bw.addMutation(m);
+ bw.close();
+ handleWriteTests(true);
+ }
+ }
+
+ private static class MRTester extends Configured implements Tool {
+ private static class BadKeyMapper implements Mapper<Key,Value,Key,Value> {
+
+ int index = 0;
+
+ @Override
+ public void map(Key key, Value value, OutputCollector<Key,Value> output, Reporter reporter)
+ throws IOException {
+ try {
+ try {
+ output.collect(key, value);
+ if (index == 2)
+ fail();
+ } catch (Exception e) {
+ log.error(e.toString(), e);
+ assertEquals(2, index);
+ }
+ } catch (AssertionError e) {
+ e1 = e;
+ }
+ index++;
+ }
+
+ @Override
+ public void configure(JobConf job) {}
+
+ @Override
+ public void close() throws IOException {
+ try {
+ assertEquals(2, index);
+ } catch (AssertionError e) {
+ e2 = e;
+ }
+ }
+
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+
+ if (args.length != 2) {
+ throw new IllegalArgumentException(
+ "Usage : " + MRTester.class.getName() + " <table> <outputfile>");
+ }
+
+ String table = args[0];
+
+ JobConf job = new JobConf(getConf());
+ job.setJarByClass(this.getClass());
+ ConfiguratorBase.setVisibilityCacheSize(job, JOB_VISIBILITY_CACHE_SIZE);
+
+ job.setInputFormat(AccumuloInputFormat.class);
+
+ AccumuloInputFormat.configure().clientInfo(getClientInfo()).table(table)
+ .auths(Authorizations.EMPTY).store(job);
+ AccumuloFileOutputFormat.configure().outputPath(new Path(args[1])).sampler(SAMPLER_CONFIG)
+ .store(job);
+
+ job.setMapperClass(BAD_TABLE.equals(table) ? BadKeyMapper.class : IdentityMapper.class);
+ job.setMapOutputKeyClass(Key.class);
+ job.setMapOutputValueClass(Value.class);
+ job.setOutputFormat(AccumuloFileOutputFormat.class);
+
+ job.setNumReduceTasks(0);
+
+ return JobClient.runJob(job).isSuccessful() ? 0 : 1;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Configuration conf = new Configuration();
+ conf.set("mapreduce.framework.name", "local");
+ conf.set("mapreduce.cluster.local.dir",
+ new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+ assertEquals(0, ToolRunner.run(conf, new MRTester(), args));
+ }
+ }
+
+ private void handleWriteTests(boolean content) throws Exception {
+ File f = folder.newFile(testName.getMethodName());
+ if (f.delete()) {
+ log.debug("Deleted {}", f);
+ }
+ MRTester.main(new String[] {content ? TEST_TABLE : EMPTY_TABLE, f.getAbsolutePath()});
+
+ assertTrue(f.exists());
+ File[] files = f.listFiles(file -> file.getName().startsWith("part-m-"));
+ assertNotNull(files);
+ if (content) {
+ assertEquals(1, files.length);
+ assertTrue(files[0].exists());
+
+ Configuration conf = CachedConfiguration.getInstance();
+ DefaultConfiguration acuconf = DefaultConfiguration.getInstance();
+ FileSKVIterator sample = RFileOperations.getInstance().newReaderBuilder()
+ .forFile(files[0].toString(), FileSystem.getLocal(conf), conf)
+ .withTableConfiguration(acuconf).build()
+ .getSample(new SamplerConfigurationImpl(SAMPLER_CONFIG));
+ assertNotNull(sample);
+ } else {
+ assertEquals(0, files.length);
+ }
+ }
+
+ @Test
+ public void writeBadVisibility() throws Exception {
+ try (AccumuloClient c = getAccumuloClient()) {
+ c.tableOperations().create(BAD_TABLE);
+ BatchWriter bw = c.createBatchWriter(BAD_TABLE, new BatchWriterConfig());
+ Mutation m = new Mutation("r1");
+ m.put("cf1", "cq1", "A&B");
+ m.put("cf1", "cq1", "A&B");
+ m.put("cf1", "cq2", "A&");
+ bw.addMutation(m);
+ bw.close();
+ File f = folder.newFile(testName.getMethodName());
+ if (f.delete()) {
+ log.debug("Deleted {}", f);
+ }
+ MRTester.main(new String[] {BAD_TABLE, f.getAbsolutePath()});
+ assertNull(e1);
+ assertNull(e2);
+ }
+ }
+}
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloInputFormatIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloInputFormatIT.java
new file mode 100644
index 0000000..083865f
--- /dev/null
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloInputFormatIT.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoop.its.mapred;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.sample.RowSampler;
+import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.hadoop.mapred.AccumuloInputFormat;
+import org.apache.accumulo.hadoop.mapreduce.InputFormatBuilder;
+import org.apache.accumulo.hadoopImpl.mapred.RangeInputSplit;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class AccumuloInputFormatIT extends AccumuloClusterHarness {
+
+ @BeforeClass
+ public static void setupClass() {
+ System.setProperty("hadoop.tmp.dir", System.getProperty("user.dir") + "/target/hadoop-tmp");
+ }
+
+ private static AssertionError e1 = null;
+ private static int e1Count = 0;
+ private static AssertionError e2 = null;
+ private static int e2Count = 0;
+
+ private static class MRTester extends Configured implements Tool {
+ private static class TestMapper implements Mapper<Key,Value,Key,Value> {
+ Key key = null;
+ int count = 0;
+
+ @Override
+ public void map(Key k, Value v, OutputCollector<Key,Value> output, Reporter reporter)
+ throws IOException {
+ try {
+ if (key != null)
+ assertEquals(key.getRow().toString(), new String(v.get()));
+ assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
+ assertEquals(new String(v.get()), String.format("%09x", count));
+ } catch (AssertionError e) {
+ e1 = e;
+ e1Count++;
+ }
+ key = new Key(k);
+ count++;
+ }
+
+ @Override
+ public void configure(JobConf job) {}
+
+ @Override
+ public void close() throws IOException {
+ try {
+ assertEquals(100, count);
+ } catch (AssertionError e) {
+ e2 = e;
+ e2Count++;
+ }
+ }
+
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+
+ if (args.length != 1 && args.length != 3) {
+ throw new IllegalArgumentException(
+ "Usage : " + MRTester.class.getName() + " <table> [<batchScan> <scan sample>]");
+ }
+
+ String table = args[0];
+ Boolean batchScan = false;
+ boolean sample = false;
+ if (args.length == 3) {
+ batchScan = Boolean.parseBoolean(args[1]);
+ sample = Boolean.parseBoolean(args[2]);
+ }
+
+ JobConf job = new JobConf(getConf());
+ job.setJarByClass(this.getClass());
+
+ job.setInputFormat(AccumuloInputFormat.class);
+
+ InputFormatBuilder.InputFormatOptions<JobConf> opts = AccumuloInputFormat.configure()
+ .clientInfo(getClientInfo()).table(table).auths(Authorizations.EMPTY);
+ if (batchScan)
+ opts.batchScan();
+ if (sample) {
+ opts.samplerConfiguration(SAMPLER_CONFIG);
+ }
+ opts.store(job);
+
+ job.setMapperClass(TestMapper.class);
+ job.setMapOutputKeyClass(Key.class);
+ job.setMapOutputValueClass(Value.class);
+ job.setOutputFormat(NullOutputFormat.class);
+
+ job.setNumReduceTasks(0);
+
+ return JobClient.runJob(job).isSuccessful() ? 0 : 1;
+ }
+
+ public static void main(String... args) throws Exception {
+ Configuration conf = new Configuration();
+ conf.set("mapreduce.framework.name", "local");
+ conf.set("mapreduce.cluster.local.dir",
+ new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+ assertEquals(0, ToolRunner.run(conf, new MRTester(), args));
+ }
+ }
+
+ @Test
+ public void testMap() throws Exception {
+ String table = getUniqueNames(1)[0];
+ try (AccumuloClient c = getAccumuloClient()) {
+ c.tableOperations().create(table);
+ BatchWriter bw = c.createBatchWriter(table, new BatchWriterConfig());
+ for (int i = 0; i < 100; i++) {
+ Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+ m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
+ bw.addMutation(m);
+ }
+ bw.close();
+
+ e1 = null;
+ e2 = null;
+
+ MRTester.main(table);
+ assertNull(e1);
+ assertNull(e2);
+ }
+ }
+
+ private static final SamplerConfiguration SAMPLER_CONFIG = new SamplerConfiguration(
+ RowSampler.class.getName()).addOption("hasher", "murmur3_32").addOption("modulus", "3");
+
+ @Test
+ public void testSample() throws Exception {
+ final String TEST_TABLE_3 = getUniqueNames(1)[0];
+
+ try (AccumuloClient c = getAccumuloClient()) {
+ c.tableOperations().create(TEST_TABLE_3,
+ new NewTableConfiguration().enableSampling(SAMPLER_CONFIG));
+ BatchWriter bw = c.createBatchWriter(TEST_TABLE_3, new BatchWriterConfig());
+ for (int i = 0; i < 100; i++) {
+ Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+ m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
+ bw.addMutation(m);
+ }
+ bw.close();
+
+ MRTester.main(TEST_TABLE_3, "False", "True");
+ assertEquals(38, e1Count);
+ assertEquals(1, e2Count);
+
+ e2Count = e1Count = 0;
+ MRTester.main(TEST_TABLE_3, "False", "False");
+ assertEquals(0, e1Count);
+ assertEquals(0, e2Count);
+
+ e2Count = e1Count = 0;
+ MRTester.main(TEST_TABLE_3, "True", "True");
+ assertEquals(38, e1Count);
+ assertEquals(1, e2Count);
+ }
+ }
+
+ @Test
+ public void testCorrectRangeInputSplits() throws Exception {
+ JobConf job = new JobConf();
+
+ String table = getUniqueNames(1)[0];
+ Authorizations auths = new Authorizations("foo");
+ Collection<IteratorSetting.Column> fetchColumns = Collections
+ .singleton(new IteratorSetting.Column(new Text("foo"), new Text("bar")));
+
+ try (AccumuloClient accumuloClient = getAccumuloClient()) {
+ accumuloClient.tableOperations().create(table);
+
+ AccumuloInputFormat.configure().clientInfo(getClientInfo()).table(table).auths(auths)
+ .fetchColumns(fetchColumns).scanIsolation().localIterators().store(job);
+
+ AccumuloInputFormat aif = new AccumuloInputFormat();
+
+ InputSplit[] splits = aif.getSplits(job, 1);
+
+ assertEquals(1, splits.length);
+
+ InputSplit split = splits[0];
+
+ assertEquals(RangeInputSplit.class, split.getClass());
+
+ RangeInputSplit risplit = (RangeInputSplit) split;
+
+ assertEquals(table, risplit.getTableName());
+ assertTrue(risplit.isIsolatedScan());
+ assertTrue(risplit.usesLocalIterators());
+ assertEquals(fetchColumns, risplit.getFetchedColumns());
+ }
+ }
+}
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloOutputFormatIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloOutputFormatIT.java
new file mode 100644
index 0000000..973ebc3
--- /dev/null
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloOutputFormatIT.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoop.its.mapred;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ClientInfo;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.hadoop.mapred.AccumuloInputFormat;
+import org.apache.accumulo.hadoop.mapred.AccumuloOutputFormat;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Test;
+
+public class AccumuloOutputFormatIT extends ConfigurableMacBase {
+
+ @Override
+ protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setProperty(Property.TSERV_SESSION_MAXIDLE, "1");
+ cfg.setNumTservers(1);
+ }
+
+ // Prevent regression of ACCUMULO-3709.
+ @Test
+ public void testMapred() throws Exception {
+ try (AccumuloClient accumuloClient = getClient()) {
+ // create a table and put some data in it
+ accumuloClient.tableOperations().create(testName.getMethodName());
+
+ JobConf job = new JobConf();
+ BatchWriterConfig batchConfig = new BatchWriterConfig();
+ // no flushes!!!!!
+ batchConfig.setMaxLatency(0, TimeUnit.MILLISECONDS);
+ // use a single thread to ensure our update session times out
+ batchConfig.setMaxWriteThreads(1);
+ // set the max memory so that we ensure we don't flush on the write.
+ batchConfig.setMaxMemory(Long.MAX_VALUE);
+ AccumuloOutputFormat outputFormat = new AccumuloOutputFormat();
+ AccumuloOutputFormat.configure().clientInfo(getClientInfo(batchConfig)).store(job);
+ RecordWriter<Text,Mutation> writer = outputFormat.getRecordWriter(null, job, "Test", null);
+
+ try {
+ for (int i = 0; i < 3; i++) {
+ Mutation m = new Mutation(new Text(String.format("%08d", i)));
+ for (int j = 0; j < 3; j++) {
+ m.put(new Text("cf1"), new Text("cq" + j), new Value((i + "_" + j).getBytes(UTF_8)));
+ }
+ writer.write(new Text(testName.getMethodName()), m);
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ // we don't want the exception to come from write
+ }
+
+ accumuloClient.securityOperations().revokeTablePermission("root", testName.getMethodName(),
+ TablePermission.WRITE);
+
+ try {
+ writer.close(null);
+ fail("Did not throw exception");
+ } catch (IOException ex) {
+ log.info(ex.getMessage(), ex);
+ assertTrue(ex.getCause() instanceof MutationsRejectedException);
+ }
+ }
+ }
+
+ private static AssertionError e1 = null;
+
+ private static class MRTester extends Configured implements Tool {
+ private static class TestMapper implements Mapper<Key,Value,Text,Mutation> {
+ Key key = null;
+ int count = 0;
+ OutputCollector<Text,Mutation> finalOutput;
+
+ @Override
+ public void map(Key k, Value v, OutputCollector<Text,Mutation> output, Reporter reporter) {
+ finalOutput = output;
+ try {
+ if (key != null)
+ assertEquals(key.getRow().toString(), new String(v.get()));
+ assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
+ assertEquals(new String(v.get()), String.format("%09x", count));
+ } catch (AssertionError e) {
+ e1 = e;
+ }
+ key = new Key(k);
+ count++;
+ }
+
+ @Override
+ public void configure(JobConf job) {}
+
+ @Override
+ public void close() throws IOException {
+ Mutation m = new Mutation("total");
+ m.put("", "", Integer.toString(count));
+ finalOutput.collect(new Text(), m);
+ }
+
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+
+ if (args.length != 6) {
+ throw new IllegalArgumentException("Usage : " + MRTester.class.getName()
+ + " <user> <pass> <inputtable> <outputtable> <instanceName> <zooKeepers>");
+ }
+
+ String user = args[0];
+ String pass = args[1];
+ String table1 = args[2];
+ String table2 = args[3];
+ String instanceName = args[4];
+ String zooKeepers = args[5];
+
+ JobConf job = new JobConf(getConf());
+ job.setJarByClass(this.getClass());
+
+ job.setInputFormat(AccumuloInputFormat.class);
+
+ ClientInfo info = Accumulo.newClient().to(instanceName, zooKeepers).as(user, pass).info();
+
+ AccumuloInputFormat.configure().clientInfo(info).table(table1).auths(Authorizations.EMPTY)
+ .store(job);
+
+ job.setMapperClass(TestMapper.class);
+ job.setMapOutputKeyClass(Key.class);
+ job.setMapOutputValueClass(Value.class);
+ job.setOutputFormat(AccumuloOutputFormat.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Mutation.class);
+
+ AccumuloOutputFormat.configure().clientInfo(info).defaultTable(table2).store(job);
+
+ job.setNumReduceTasks(0);
+
+ return JobClient.runJob(job).isSuccessful() ? 0 : 1;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Configuration conf = new Configuration();
+ conf.set("mapreduce.framework.name", "local");
+ conf.set("mapreduce.cluster.local.dir",
+ new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+ assertEquals(0, ToolRunner.run(conf, new MRTester(), args));
+ }
+ }
+
+ @Test
+ public void testMR() throws Exception {
+ try (AccumuloClient c = getClient()) {
+ String instanceName = getCluster().getInstanceName();
+ String table1 = instanceName + "_t1";
+ String table2 = instanceName + "_t2";
+ c.tableOperations().create(table1);
+ c.tableOperations().create(table2);
+ BatchWriter bw = c.createBatchWriter(table1, new BatchWriterConfig());
+ for (int i = 0; i < 100; i++) {
+ Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+ m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
+ bw.addMutation(m);
+ }
+ bw.close();
+
+ MRTester.main(new String[] {"root", ROOT_PASSWORD, table1, table2, instanceName,
+ getCluster().getZooKeepers()});
+ assertNull(e1);
+
+ try (Scanner scanner = c.createScanner(table2, new Authorizations())) {
+ Iterator<Entry<Key,Value>> iter = scanner.iterator();
+ assertTrue(iter.hasNext());
+ Entry<Key,Value> entry = iter.next();
+ assertEquals(Integer.parseInt(new String(entry.getValue().get())), 100);
+ assertFalse(iter.hasNext());
+ }
+ }
+ }
+
+}
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloRowInputFormatIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloRowInputFormatIT.java
index 2147554..c385437 100644
--- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloRowInputFormatIT.java
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloRowInputFormatIT.java
@@ -39,7 +39,6 @@ import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.accumulo.core.util.PeekingIterator;
import org.apache.accumulo.hadoop.mapred.AccumuloRowInputFormat;
-import org.apache.accumulo.hadoop.mapreduce.InputInfo;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
@@ -166,8 +165,8 @@ public class AccumuloRowInputFormatIT extends AccumuloClusterHarness {
job.setInputFormat(AccumuloRowInputFormat.class);
- AccumuloRowInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo())
- .table(table).scanAuths(Authorizations.EMPTY).build());
+ AccumuloRowInputFormat.configure().clientInfo(getClientInfo()).table(table)
+ .auths(Authorizations.EMPTY).store(job);
job.setMapperClass(TestMapper.class);
job.setMapOutputKeyClass(Key.class);
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/TokenFileIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/TokenFileIT.java
new file mode 100644
index 0000000..fb4e4c3
--- /dev/null
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/TokenFileIT.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoop.its.mapred;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Paths;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ClientInfo;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.hadoop.mapred.AccumuloInputFormat;
+import org.apache.accumulo.hadoop.mapred.AccumuloOutputFormat;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+public class TokenFileIT extends AccumuloClusterHarness {
+ private static AssertionError e1 = null;
+
+ private static class MRTokenFileTester extends Configured implements Tool {
+ private static class TestMapper implements Mapper<Key,Value,Text,Mutation> {
+ Key key = null;
+ int count = 0;
+ OutputCollector<Text,Mutation> finalOutput;
+
+ @Override
+ public void map(Key k, Value v, OutputCollector<Text,Mutation> output, Reporter reporter)
+ throws IOException {
+ finalOutput = output;
+ try {
+ if (key != null)
+ assertEquals(key.getRow().toString(), new String(v.get()));
+ assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
+ assertEquals(new String(v.get()), String.format("%09x", count));
+ } catch (AssertionError e) {
+ e1 = e;
+ }
+ key = new Key(k);
+ count++;
+ }
+
+ @Override
+ public void configure(JobConf job) {}
+
+ @Override
+ public void close() throws IOException {
+ Mutation m = new Mutation("total");
+ m.put("", "", Integer.toString(count));
+ finalOutput.collect(new Text(), m);
+ }
+
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public int run(String[] args) throws Exception {
+
+ if (args.length != 3) {
+ throw new IllegalArgumentException("Usage : " + MRTokenFileTester.class.getName()
+ + " <token file> <inputtable> <outputtable>");
+ }
+
+ String tokenFile = args[0];
+ ClientInfo ci = ClientInfo.from(Paths.get(tokenFile));
+ String table1 = args[1];
+ String table2 = args[2];
+
+ JobConf job = new JobConf(getConf());
+ job.setJarByClass(this.getClass());
+
+ job.setInputFormat(AccumuloInputFormat.class);
+
+ AccumuloInputFormat.configure().clientInfo(ci).table(table1).auths(Authorizations.EMPTY)
+ .store(job);
+
+ job.setMapperClass(TestMapper.class);
+ job.setMapOutputKeyClass(Key.class);
+ job.setMapOutputValueClass(Value.class);
+ job.setOutputFormat(AccumuloOutputFormat.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Mutation.class);
+
+ AccumuloOutputFormat.configure().clientInfo(ci).defaultTable(table2).store(job);
+
+ job.setNumReduceTasks(0);
+
+ return JobClient.runJob(job).isSuccessful() ? 0 : 1;
+ }
+
+ @SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "path provided by test")
+ public static void main(String[] args) throws Exception {
+ Configuration conf = CachedConfiguration.getInstance();
+ conf.set("hadoop.tmp.dir", new File(args[0]).getParent());
+ conf.set("mapreduce.framework.name", "local");
+ conf.set("mapreduce.cluster.local.dir",
+ new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+ assertEquals(0, ToolRunner.run(conf, new MRTokenFileTester(), args));
+ }
+ }
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder(
+ new File(System.getProperty("user.dir") + "/target"));
+
+ @Test
+ public void testMR() throws Exception {
+ String[] tableNames = getUniqueNames(2);
+ String table1 = tableNames[0];
+ String table2 = tableNames[1];
+ try (AccumuloClient c = getAccumuloClient()) {
+ c.tableOperations().create(table1);
+ c.tableOperations().create(table2);
+ BatchWriter bw = c.createBatchWriter(table1, new BatchWriterConfig());
+ for (int i = 0; i < 100; i++) {
+ Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+ m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
+ bw.addMutation(m);
+ }
+ bw.close();
+
+ File tf = folder.newFile("client.properties");
+ try (PrintStream out = new PrintStream(tf)) {
+ getClientInfo().getProperties().store(out, "Credentials for " + getClass().getName());
+ }
+
+ MRTokenFileTester.main(new String[] {tf.getAbsolutePath(), table1, table2});
+ assertNull(e1);
+
+ try (Scanner scanner = c.createScanner(table2, new Authorizations())) {
+ Iterator<Entry<Key,Value>> iter = scanner.iterator();
+ assertTrue(iter.hasNext());
+ Entry<Key,Value> entry = iter.next();
+ assertEquals(Integer.parseInt(new String(entry.getValue().get())), 100);
+ assertFalse(iter.hasNext());
+ }
+ }
+ }
+}
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloFileOutputFormatIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloFileOutputFormatIT.java
new file mode 100644
index 0000000..7c482a4
--- /dev/null
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloFileOutputFormatIT.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoop.its.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.sample.RowSampler;
+import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.file.rfile.RFileOperations;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.hadoop.mapreduce.AccumuloFileOutputFormat;
+import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
+public class AccumuloFileOutputFormatIT extends AccumuloClusterHarness {
+
+ private String PREFIX;
+ private String BAD_TABLE;
+ private String TEST_TABLE;
+ private String EMPTY_TABLE;
+
+ private static final SamplerConfiguration SAMPLER_CONFIG = new SamplerConfiguration(
+ RowSampler.class.getName()).addOption("hasher", "murmur3_32").addOption("modulus", "3");
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 4 * 60;
+ }
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder(
+ new File(System.getProperty("user.dir") + "/target"));
+
+ @Before
+ public void setup() throws Exception {
+ PREFIX = testName.getMethodName() + "_";
+ BAD_TABLE = PREFIX + "_mapreduce_bad_table";
+ TEST_TABLE = PREFIX + "_mapreduce_test_table";
+ EMPTY_TABLE = PREFIX + "_mapreduce_empty_table";
+
+ try (AccumuloClient c = getAccumuloClient()) {
+ c.tableOperations().create(EMPTY_TABLE);
+ c.tableOperations().create(TEST_TABLE);
+ c.tableOperations().create(BAD_TABLE);
+ BatchWriter bw = c.createBatchWriter(TEST_TABLE, new BatchWriterConfig());
+ Mutation m = new Mutation("Key");
+ m.put("", "", "");
+ bw.addMutation(m);
+ bw.close();
+ bw = c.createBatchWriter(BAD_TABLE, new BatchWriterConfig());
+ m = new Mutation("r1");
+ m.put("cf1", "cq1", "A&B");
+ m.put("cf1", "cq1", "A&B");
+ m.put("cf1", "cq2", "A&");
+ bw.addMutation(m);
+ bw.close();
+ }
+ }
+
+ @Test
+ public void testEmptyWrite() throws Exception {
+ handleWriteTests(false);
+ }
+
+ @Test
+ public void testRealWrite() throws Exception {
+ handleWriteTests(true);
+ }
+
+ private static class MRTester extends Configured implements Tool {
+ private static class BadKeyMapper extends Mapper<Key,Value,Key,Value> {
+ int index = 0;
+
+ @Override
+ protected void map(Key key, Value value, Context context)
+ throws IOException, InterruptedException {
+ String table = context.getConfiguration().get("MRTester_tableName");
+ assertNotNull(table);
+ try {
+ try {
+ context.write(key, value);
+ if (index == 2)
+ fail();
+ } catch (Exception e) {
+ assertEquals(2, index);
+ }
+ } catch (AssertionError e) {
+ assertionErrors.put(table + "_map", e);
+ }
+ index++;
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ String table = context.getConfiguration().get("MRTester_tableName");
+ assertNotNull(table);
+ try {
+ assertEquals(2, index);
+ } catch (AssertionError e) {
+ assertionErrors.put(table + "_cleanup", e);
+ }
+ }
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+
+ if (args.length != 2) {
+ throw new IllegalArgumentException(
+ "Usage : " + MRTester.class.getName() + " <table> <outputfile>");
+ }
+
+ String table = args[0];
+ assertionErrors.put(table + "_map", new AssertionError("Dummy_map"));
+ assertionErrors.put(table + "_cleanup", new AssertionError("Dummy_cleanup"));
+
+ Job job = Job.getInstance(getConf(),
+ this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
+ job.setJarByClass(this.getClass());
+
+ job.setInputFormatClass(AccumuloInputFormat.class);
+
+ AccumuloInputFormat.configure().clientInfo(getClientInfo()).table(table)
+ .auths(Authorizations.EMPTY).store(job);
+ AccumuloFileOutputFormat.configure().outputPath(new Path(args[1])).sampler(SAMPLER_CONFIG)
+ .store(job);
+
+ job.setMapperClass(
+ table.endsWith("_mapreduce_bad_table") ? BadKeyMapper.class : Mapper.class);
+ job.setMapOutputKeyClass(Key.class);
+ job.setMapOutputValueClass(Value.class);
+ job.setOutputFormatClass(AccumuloFileOutputFormat.class);
+ job.getConfiguration().set("MRTester_tableName", table);
+
+ job.setNumReduceTasks(0);
+
+ job.waitForCompletion(true);
+
+ return job.isSuccessful() ? 0 : 1;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Configuration conf = new Configuration();
+ conf.set("mapreduce.framework.name", "local");
+ conf.set("mapreduce.cluster.local.dir",
+ new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+ assertEquals(0, ToolRunner.run(conf, new MRTester(), args));
+ }
+ }
+
+ private void handleWriteTests(boolean content) throws Exception {
+ File f = folder.newFile(testName.getMethodName());
+ assertTrue(f.delete());
+ MRTester.main(new String[] {content ? TEST_TABLE : EMPTY_TABLE, f.getAbsolutePath()});
+
+ assertTrue(f.exists());
+ File[] files = f.listFiles(file -> file.getName().startsWith("part-m-"));
+ assertNotNull(files);
+ if (content) {
+ assertEquals(1, files.length);
+ assertTrue(files[0].exists());
+
+ Configuration conf = CachedConfiguration.getInstance();
+ DefaultConfiguration acuconf = DefaultConfiguration.getInstance();
+ FileSKVIterator sample = RFileOperations.getInstance().newReaderBuilder()
+ .forFile(files[0].toString(), FileSystem.getLocal(conf), conf)
+ .withTableConfiguration(acuconf).build()
+ .getSample(new SamplerConfigurationImpl(SAMPLER_CONFIG));
+ assertNotNull(sample);
+ } else {
+ assertEquals(0, files.length);
+ }
+ }
+
+ // track errors in the map reduce job; jobs insert a dummy error for the map and cleanup tasks (to
+ // ensure test correctness),
+ // so error tests should check to see if there is at least one error (could be more depending on
+ // the test) rather than zero
+ private static Multimap<String,AssertionError> assertionErrors = ArrayListMultimap.create();
+
+ @Test
+ public void writeBadVisibility() throws Exception {
+ File f = folder.newFile(testName.getMethodName());
+ assertTrue(f.delete());
+ MRTester.main(new String[] {BAD_TABLE, f.getAbsolutePath()});
+ assertTrue(f.exists());
+ assertEquals(1, assertionErrors.get(BAD_TABLE + "_map").size());
+ assertEquals(1, assertionErrors.get(BAD_TABLE + "_cleanup").size());
+ }
+
+}
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/NewAccumuloInputFormatIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloInputFormatIT.java
similarity index 87%
rename from hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/NewAccumuloInputFormatIT.java
rename to hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloInputFormatIT.java
index a775550..a806726 100644
--- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/NewAccumuloInputFormatIT.java
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloInputFormatIT.java
@@ -48,7 +48,7 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
-import org.apache.accumulo.hadoop.mapreduce.InputInfo;
+import org.apache.accumulo.hadoop.mapreduce.InputFormatBuilder.InputFormatOptions;
import org.apache.accumulo.hadoopImpl.mapreduce.BatchInputSplit;
import org.apache.accumulo.hadoopImpl.mapreduce.RangeInputSplit;
import org.apache.accumulo.harness.AccumuloClusterHarness;
@@ -75,7 +75,7 @@ import com.google.common.collect.Multimap;
*
* @since 2.0
*/
-public class NewAccumuloInputFormatIT extends AccumuloClusterHarness {
+public class AccumuloInputFormatIT extends AccumuloClusterHarness {
AccumuloInputFormat inputFormat;
@@ -106,8 +106,8 @@ public class NewAccumuloInputFormatIT extends AccumuloClusterHarness {
insertData(table, currentTimeMillis());
Job job = Job.getInstance();
- AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo()).table(table)
- .scanAuths(Authorizations.EMPTY).scanIsolation().build());
+ AccumuloInputFormat.configure().clientInfo(getClientInfo()).table(table)
+ .auths(Authorizations.EMPTY).scanIsolation().store(job);
// split table
TreeSet<Text> splitsToAdd = new TreeSet<>();
@@ -126,14 +126,14 @@ public class NewAccumuloInputFormatIT extends AccumuloClusterHarness {
List<Range> ranges = new ArrayList<>();
for (Text text : actualSplits)
ranges.add(new Range(text));
- AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo()).table(table)
- .scanAuths(Authorizations.EMPTY).ranges(ranges).build());
+ AccumuloInputFormat.configure().clientInfo(getClientInfo()).table(table)
+ .auths(Authorizations.EMPTY).ranges(ranges).store(job);
splits = inputFormat.getSplits(job);
assertEquals(actualSplits.size(), splits.size());
// offline mode
- AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo()).table(table)
- .scanAuths(Authorizations.EMPTY).offlineScan().build());
+ AccumuloInputFormat.configure().clientInfo(getClientInfo()).table(table)
+ .auths(Authorizations.EMPTY).offlineScan().store(job);
try {
inputFormat.getSplits(job);
fail("An exception should have been thrown");
@@ -148,19 +148,19 @@ public class NewAccumuloInputFormatIT extends AccumuloClusterHarness {
for (int i = 0; i < 5; i++)
// overlapping ranges
ranges.add(new Range(String.format("%09d", i), String.format("%09d", i + 2)));
- AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo()).table(table)
- .scanAuths(Authorizations.EMPTY).ranges(ranges).offlineScan().build());
+ AccumuloInputFormat.configure().clientInfo(getClientInfo()).table(table)
+ .auths(Authorizations.EMPTY).ranges(ranges).offlineScan().store(job);
splits = inputFormat.getSplits(job);
assertEquals(2, splits.size());
- AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo()).table(table)
- .scanAuths(Authorizations.EMPTY).disableAutoAdjustRanges().offlineScan().build());
+ AccumuloInputFormat.configure().clientInfo(getClientInfo()).table(table)
+ .auths(Authorizations.EMPTY).disableAutoAdjustRanges().offlineScan().store(job);
splits = inputFormat.getSplits(job);
assertEquals(ranges.size(), splits.size());
// BatchScan not available for offline scans
- AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo()).table(table)
- .scanAuths(Authorizations.EMPTY).batchScan().build());
+ AccumuloInputFormat.configure().clientInfo(getClientInfo()).table(table)
+ .auths(Authorizations.EMPTY).batchScan().store(job);
try {
inputFormat.getSplits(job);
fail("An exception should have been thrown");
@@ -168,28 +168,28 @@ public class NewAccumuloInputFormatIT extends AccumuloClusterHarness {
// table online tests
client.tableOperations().online(table, true);
- AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo()).table(table)
- .scanAuths(Authorizations.EMPTY).build());
+ AccumuloInputFormat.configure().clientInfo(getClientInfo()).table(table)
+ .auths(Authorizations.EMPTY).store(job);
// test for resumption of success
splits = inputFormat.getSplits(job);
assertEquals(2, splits.size());
// BatchScan not available with isolated iterators
- AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo()).table(table)
- .scanAuths(Authorizations.EMPTY).scanIsolation().build());
+ AccumuloInputFormat.configure().clientInfo(getClientInfo()).table(table)
+ .auths(Authorizations.EMPTY).scanIsolation().store(job);
splits = inputFormat.getSplits(job);
assertEquals(2, splits.size());
// BatchScan not available with local iterators
- AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo()).table(table)
- .scanAuths(Authorizations.EMPTY).localIterators().build());
+ AccumuloInputFormat.configure().clientInfo(getClientInfo()).table(table)
+ .auths(Authorizations.EMPTY).localIterators().store(job);
splits = inputFormat.getSplits(job);
assertEquals(2, splits.size());
- AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo()).table(table)
- .scanAuths(Authorizations.EMPTY).batchScan().build());
+ AccumuloInputFormat.configure().clientInfo(getClientInfo()).table(table)
+ .auths(Authorizations.EMPTY).batchScan().store(job);
// Check we are getting back correct type pf split
splits = inputFormat.getSplits(job);
@@ -285,14 +285,14 @@ public class NewAccumuloInputFormatIT extends AccumuloClusterHarness {
job.setInputFormatClass(inputFormatClass);
- InputInfo.InputInfoBuilder.InputFormatOptions opts = InputInfo.builder()
- .clientInfo(getClientInfo()).table(table).scanAuths(Authorizations.EMPTY);
+ InputFormatOptions<Job> opts = AccumuloInputFormat.configure().clientInfo(getClientInfo())
+ .table(table).auths(Authorizations.EMPTY);
if (sample)
opts = opts.samplerConfiguration(SAMPLER_CONFIG);
if (batchScan)
- AccumuloInputFormat.setInfo(job, opts.batchScan().build());
+ opts.batchScan().store(job);
else
- AccumuloInputFormat.setInfo(job, opts.build());
+ opts.store(job);
job.setMapperClass(TestMapper.class);
job.setMapOutputKeyClass(Key.class);
@@ -406,10 +406,9 @@ public class NewAccumuloInputFormatIT extends AccumuloClusterHarness {
AccumuloClient accumuloClient = getAccumuloClient();
accumuloClient.tableOperations().create(table);
- InputInfo.InputInfoBuilder.InputFormatOptions opts = InputInfo.builder()
- .clientInfo(getClientInfo()).table(table).scanAuths(auths);
- AccumuloInputFormat.setInfo(job,
- opts.fetchColumns(fetchColumns).scanIsolation().localIterators().build());
+ InputFormatOptions<Job> opts = AccumuloInputFormat.configure().clientInfo(getClientInfo())
+ .table(table).auths(auths);
+ opts.fetchColumns(fetchColumns).scanIsolation().localIterators().store(job);
AccumuloInputFormat aif = new AccumuloInputFormat();
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloOutputFormatIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloOutputFormatIT.java
new file mode 100644
index 0000000..badebe9
--- /dev/null
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloOutputFormatIT.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoop.its.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Test;
+
+public class AccumuloOutputFormatIT extends AccumuloClusterHarness {
+ private static AssertionError e1 = null;
+
+ private static class MRTester extends Configured implements Tool {
+ private static class TestMapper extends Mapper<Key,Value,Text,Mutation> {
+ Key key = null;
+ int count = 0;
+
+ @Override
+ protected void map(Key k, Value v, Context context) throws IOException, InterruptedException {
+ try {
+ if (key != null)
+ assertEquals(key.getRow().toString(), new String(v.get()));
+ assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
+ assertEquals(new String(v.get()), String.format("%09x", count));
+ } catch (AssertionError e) {
+ e1 = e;
+ }
+ key = new Key(k);
+ count++;
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ Mutation m = new Mutation("total");
+ m.put("", "", Integer.toString(count));
+ context.write(new Text(), m);
+ }
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+
+ if (args.length != 2) {
+ throw new IllegalArgumentException(
+ "Usage : " + MRTester.class.getName() + " <inputtable> <outputtable>");
+ }
+
+ String table1 = args[0];
+ String table2 = args[1];
+
+ Job job = Job.getInstance(getConf(),
+ this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
+ job.setJarByClass(this.getClass());
+
+ job.setInputFormatClass(AccumuloInputFormat.class);
+
+ AccumuloInputFormat.configure().clientInfo(getClientInfo()).table(table1)
+ .auths(Authorizations.EMPTY).store(job);
+
+ job.setMapperClass(TestMapper.class);
+ job.setMapOutputKeyClass(Key.class);
+ job.setMapOutputValueClass(Value.class);
+ job.setOutputFormatClass(AccumuloOutputFormat.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Mutation.class);
+
+ AccumuloOutputFormat.configure().clientInfo(getClientInfo()).defaultTable(table2).store(job);
+
+ job.setNumReduceTasks(0);
+
+ job.waitForCompletion(true);
+
+ return job.isSuccessful() ? 0 : 1;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Configuration conf = new Configuration();
+ conf.set("mapreduce.framework.name", "local");
+ conf.set("mapreduce.cluster.local.dir",
+ new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+ assertEquals(0, ToolRunner.run(conf, new MRTester(), args));
+ }
+ }
+
+ @Test
+ public void testMR() throws Exception {
+ String[] tableNames = getUniqueNames(2);
+ String table1 = tableNames[0];
+ String table2 = tableNames[1];
+ try (AccumuloClient c = getAccumuloClient()) {
+ c.tableOperations().create(table1);
+ c.tableOperations().create(table2);
+ BatchWriter bw = c.createBatchWriter(table1, new BatchWriterConfig());
+ for (int i = 0; i < 100; i++) {
+ Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+ m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
+ bw.addMutation(m);
+ }
+ bw.close();
+
+ MRTester.main(new String[] {table1, table2});
+ assertNull(e1);
+
+ try (Scanner scanner = c.createScanner(table2, new Authorizations())) {
+ Iterator<Entry<Key,Value>> iter = scanner.iterator();
+ assertTrue(iter.hasNext());
+ Entry<Key,Value> entry = iter.next();
+ assertEquals(Integer.parseInt(new String(entry.getValue().get())), 100);
+ assertFalse(iter.hasNext());
+ }
+ }
+ }
+}
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloRowInputFormatIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloRowInputFormatIT.java
index a125a7c..6a836cf 100644
--- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloRowInputFormatIT.java
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloRowInputFormatIT.java
@@ -39,7 +39,6 @@ import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.accumulo.core.util.PeekingIterator;
import org.apache.accumulo.hadoop.mapreduce.AccumuloRowInputFormat;
-import org.apache.accumulo.hadoop.mapreduce.InputInfo;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
@@ -160,8 +159,8 @@ public class AccumuloRowInputFormatIT extends AccumuloClusterHarness {
job.setInputFormatClass(AccumuloRowInputFormat.class);
- AccumuloRowInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo())
- .table(table).scanAuths(Authorizations.EMPTY).build());
+ AccumuloRowInputFormat.configure().clientInfo(getClientInfo()).table(table)
+ .auths(Authorizations.EMPTY).store(job);
job.setMapperClass(TestMapper.class);
job.setMapOutputKeyClass(Key.class);
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/RowHashIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/RowHashIT.java
new file mode 100644
index 0000000..75c3efb
--- /dev/null
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/RowHashIT.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoop.its.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.hadoopImpl.mapreduce.lib.MapReduceClientOnRequiredTable;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Test;
+
+import com.beust.jcommander.Parameter;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+public class RowHashIT extends ConfigurableMacBase {
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 60;
+ }
+
+ public static final String hadoopTmpDirArg = "-Dhadoop.tmp.dir=" + System.getProperty("user.dir")
+ + "/target/hadoop-tmp";
+
+ static final String tablename = "mapredf";
+ static final String input_cf = "cf-HASHTYPE";
+ static final String input_cq = "cq-NOTHASHED";
+ static final String input_cfcq = input_cf + ":" + input_cq;
+ static final String output_cq = "cq-MD4BASE64";
+
+ @Test
+ public void test() throws Exception {
+ try (AccumuloClient client = getClient()) {
+ runTest(client, getCluster());
+ }
+ }
+
+ @SuppressFBWarnings(value = "WEAK_MESSAGE_DIGEST_MD5", justification = "md5 is okay for testing")
+ static void runTest(AccumuloClient c, MiniAccumuloClusterImpl cluster) throws AccumuloException,
+ AccumuloSecurityException, TableExistsException, TableNotFoundException,
+ MutationsRejectedException, IOException, InterruptedException, NoSuchAlgorithmException {
+ c.tableOperations().create(tablename);
+ BatchWriter bw = c.createBatchWriter(tablename, new BatchWriterConfig());
+ for (int i = 0; i < 10; i++) {
+ Mutation m = new Mutation("" + i);
+ m.put(input_cf, input_cq, "row" + i);
+ bw.addMutation(m);
+ }
+ bw.close();
+ Process hash = cluster.exec(RowHash.class, Collections.singletonList(hadoopTmpDirArg), "-i",
+ c.info().getInstanceName(), "-z", c.info().getZooKeepers(), "-u", "root", "-p",
+ ROOT_PASSWORD, "-t", tablename, "--column", input_cfcq);
+ assertEquals(0, hash.waitFor());
+
+ try (Scanner s = c.createScanner(tablename, Authorizations.EMPTY)) {
+ s.fetchColumn(new Text(input_cf), new Text(output_cq));
+ int i = 0;
+ for (Entry<Key,Value> entry : s) {
+ MessageDigest md = MessageDigest.getInstance("MD5");
+ byte[] check = Base64.getEncoder().encode(md.digest(("row" + i).getBytes()));
+ assertEquals(entry.getValue().toString(), new String(check));
+ i++;
+ }
+ }
+ }
+
+ public static class RowHash extends Configured implements Tool {
+ /**
+ * The Mapper class that given a row number, will generate the appropriate output line.
+ */
+ public class HashDataMapper extends Mapper<Key,Value,Text,Mutation> {
+ @Override
+ public void map(Key row, Value data, Context context)
+ throws IOException, InterruptedException {
+ Mutation m = new Mutation(row.getRow());
+ m.put(new Text("cf-HASHTYPE"), new Text("cq-MD5BASE64"),
+ new Value(Base64.getEncoder().encode(MD5Hash.digest(data.toString()).getDigest())));
+ context.write(null, m);
+ context.progress();
+ }
+
+ @Override
+ public void setup(Context job) {}
+ }
+
+ public class Opts extends MapReduceClientOnRequiredTable {
+ @Parameter(names = "--column", required = true)
+ String column;
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Job job = Job.getInstance(getConf());
+ job.setJobName(this.getClass().getName());
+ job.setJarByClass(this.getClass());
+ RowHash.Opts opts = new RowHash.Opts();
+ opts.parseArgs(RowHash.class.getName(), args);
+ job.setInputFormatClass(AccumuloInputFormat.class);
+ opts.setAccumuloConfigs(job);
+
+ String col = opts.column;
+ int idx = col.indexOf(":");
+ Text cf = new Text(idx < 0 ? col : col.substring(0, idx));
+ Text cq = idx < 0 ? null : new Text(col.substring(idx + 1));
+ if (cf.getLength() > 0)
+ AccumuloInputFormat.configure().clientInfo(opts.getClientInfo()).table(opts.getTableName())
+ .auths(Authorizations.EMPTY)
+ .fetchColumns(Collections.singleton(new IteratorSetting.Column(cf, cq))).store(job);
+
+ job.setMapperClass(RowHash.HashDataMapper.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(Mutation.class);
+
+ job.setNumReduceTasks(0);
+
+ job.setOutputFormatClass(AccumuloOutputFormat.class);
+ AccumuloOutputFormat.configure().clientInfo(opts.getClientInfo()).store(job);
+
+ job.waitForCompletion(true);
+ return job.isSuccessful() ? 0 : 1;
+ }
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new Configuration(), new RowHash(), args);
+ }
+ }
+
+}
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/TokenFileIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/TokenFileIT.java
new file mode 100644
index 0000000..ae29912
--- /dev/null
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/TokenFileIT.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoop.its.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Paths;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ClientInfo;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+public class TokenFileIT extends AccumuloClusterHarness {
+ private static AssertionError e1 = null;
+
+ private static class MRTokenFileTester extends Configured implements Tool {
+ private static class TestMapper extends Mapper<Key,Value,Text,Mutation> {
+ Key key = null;
+ int count = 0;
+
+ @Override
+ protected void map(Key k, Value v, Context context) throws IOException, InterruptedException {
+ try {
+ if (key != null)
+ assertEquals(key.getRow().toString(), new String(v.get()));
+ assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
+ assertEquals(new String(v.get()), String.format("%09x", count));
+ } catch (AssertionError e) {
+ e1 = e;
+ }
+ key = new Key(k);
+ count++;
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ Mutation m = new Mutation("total");
+ m.put("", "", Integer.toString(count));
+ context.write(new Text(), m);
+ }
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public int run(String[] args) throws Exception {
+
+ if (args.length != 3) {
+ throw new IllegalArgumentException("Usage : " + MRTokenFileTester.class.getName()
+ + " <token file> <inputtable> <outputtable>");
+ }
+
+ String tokenFile = args[0];
+ ClientInfo ci = ClientInfo.from(Paths.get(tokenFile));
+ String table1 = args[1];
+ String table2 = args[2];
+
+ Job job = Job.getInstance(getConf(),
+ this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
+ job.setJarByClass(this.getClass());
+
+ job.setInputFormatClass(AccumuloInputFormat.class);
+
+ AccumuloInputFormat.configure().clientInfo(ci).table(table1).auths(Authorizations.EMPTY)
+ .store(job);
+
+ job.setMapperClass(TestMapper.class);
+ job.setMapOutputKeyClass(Key.class);
+ job.setMapOutputValueClass(Value.class);
+ job.setOutputFormatClass(AccumuloOutputFormat.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Mutation.class);
+
+ AccumuloOutputFormat.configure().clientInfo(ci).defaultTable(table2).store(job);
+
+ job.setNumReduceTasks(0);
+
+ job.waitForCompletion(true);
+
+ return job.isSuccessful() ? 0 : 1;
+ }
+
+ @SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "path provided by test")
+ public static void main(String[] args) throws Exception {
+ Configuration conf = CachedConfiguration.getInstance();
+ conf.set("hadoop.tmp.dir", new File(args[0]).getParent());
+ conf.set("mapreduce.framework.name", "local");
+ conf.set("mapreduce.cluster.local.dir",
+ new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+ assertEquals(0, ToolRunner.run(conf, new MRTokenFileTester(), args));
+ }
+ }
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder(
+ new File(System.getProperty("user.dir") + "/target"));
+
+ @Test
+ public void testMR() throws Exception {
+ String[] tableNames = getUniqueNames(2);
+ String table1 = tableNames[0];
+ String table2 = tableNames[1];
+ try (AccumuloClient c = getAccumuloClient()) {
+ c.tableOperations().create(table1);
+ c.tableOperations().create(table2);
+ BatchWriter bw = c.createBatchWriter(table1, new BatchWriterConfig());
+ for (int i = 0; i < 100; i++) {
+ Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+ m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
+ bw.addMutation(m);
+ }
+ bw.close();
+
+ File tf = folder.newFile("client.properties");
+ try (PrintStream out = new PrintStream(tf)) {
+ getClientInfo().getProperties().store(out, "Credentials for " + getClass().getName());
+ }
+
+ MRTokenFileTester.main(new String[] {tf.getAbsolutePath(), table1, table2});
+ assertNull(e1);
+
+ try (Scanner scanner = c.createScanner(table2, new Authorizations())) {
+ Iterator<Entry<Key,Value>> iter = scanner.iterator();
+ assertTrue(iter.hasNext());
+ Entry<Key,Value> entry = iter.next();
+ assertEquals(Integer.parseInt(new String(entry.getValue().get())), 100);
+ assertFalse(iter.hasNext());
+ }
+ }
+ }
+}
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloFileOutputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloFileOutputFormatTest.java
index 7e1db66..c170918 100644
--- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloFileOutputFormatTest.java
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloFileOutputFormatTest.java
@@ -31,7 +31,6 @@ import org.apache.accumulo.core.client.summary.summarizers.VisibilitySummarizer;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
-import org.apache.accumulo.hadoop.mapreduce.FileOutputInfo;
import org.apache.accumulo.hadoopImpl.mapreduce.lib.FileOutputConfigurator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
@@ -57,10 +56,9 @@ public class AccumuloFileOutputFormatTest {
.addOption(CountingSummarizer.MAX_COUNTERS_OPT, 256).build();
JobConf job = new JobConf();
- AccumuloFileOutputFormat.setInfo(job,
- FileOutputInfo.builder().outputPath(new Path("somewhere")).replication(a).fileBlockSize(b)
- .dataBlockSize(c).indexBlockSize(d).compressionType(e).sampler(samplerConfig)
- .summarizers(sc1, sc2).build());
+ AccumuloFileOutputFormat.configure().outputPath(new Path("somewhere")).replication(a)
+ .fileBlockSize(b).dataBlockSize(c).indexBlockSize(d).compression(e).sampler(samplerConfig)
+ .summarizers(sc1, sc2).store(job);
AccumuloConfiguration acuconf = FileOutputConfigurator
.getAccumuloConfiguration(AccumuloFileOutputFormat.class, job);
@@ -89,9 +87,9 @@ public class AccumuloFileOutputFormatTest {
samplerConfig.addOption("modulus", "100003");
job = new JobConf();
- AccumuloFileOutputFormat.setInfo(job,
- FileOutputInfo.builder().outputPath(new Path("somewhere")).replication(a).fileBlockSize(b)
- .dataBlockSize(c).indexBlockSize(d).compressionType(e).sampler(samplerConfig).build());
+ AccumuloFileOutputFormat.configure().outputPath(new Path("somewhere")).replication(a)
+ .fileBlockSize(b).dataBlockSize(c).indexBlockSize(d).compression(e).sampler(samplerConfig)
+ .store(job);
acuconf = FileOutputConfigurator.getAccumuloConfiguration(AccumuloFileOutputFormat.class, job);
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormatTest.java
index e204e8f..6831869 100644
--- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormatTest.java
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormatTest.java
@@ -38,7 +38,7 @@ import org.apache.accumulo.core.iterators.user.RegExFilter;
import org.apache.accumulo.core.iterators.user.VersioningIterator;
import org.apache.accumulo.core.iterators.user.WholeRowIterator;
import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.hadoop.mapreduce.InputInfo;
+import org.apache.accumulo.hadoop.mapreduce.InputFormatBuilder.InputFormatOptions;
import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
@@ -77,11 +77,11 @@ public class AccumuloInputFormatTest {
*/
@Test
public void testSetIterator() throws IOException {
- InputInfo.InputInfoBuilder.InputFormatOptions opts = InputInfo.builder().clientInfo(clientInfo)
- .table("test").scanAuths(Authorizations.EMPTY);
+ InputFormatOptions<JobConf> opts = AccumuloInputFormat.configure().clientInfo(clientInfo)
+ .table("test").auths(Authorizations.EMPTY);
IteratorSetting is = new IteratorSetting(1, "WholeRow", WholeRowIterator.class);
- AccumuloInputFormat.setInfo(job, opts.addIterator(is).build());
+ opts.addIterator(is).store(job);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
is.write(new DataOutputStream(baos));
String iterators = job.get("AccumuloInputFormat.ScanOpts.Iterators");
@@ -90,16 +90,15 @@ public class AccumuloInputFormatTest {
@Test
public void testAddIterator() {
- InputInfo.InputInfoBuilder.InputFormatOptions opts = InputInfo.builder().clientInfo(clientInfo)
- .table("test").scanAuths(Authorizations.EMPTY);
+ InputFormatOptions<JobConf> opts = AccumuloInputFormat.configure().clientInfo(clientInfo)
+ .table("test").auths(Authorizations.EMPTY);
IteratorSetting iter1 = new IteratorSetting(1, "WholeRow", WholeRowIterator.class);
IteratorSetting iter2 = new IteratorSetting(2, "Versions", VersioningIterator.class);
IteratorSetting iter3 = new IteratorSetting(3, "Count", CountingIterator.class);
iter3.addOption("v1", "1");
iter3.addOption("junk", "\0omg:!\\xyzzy");
- AccumuloInputFormat.setInfo(job,
- opts.addIterator(iter1).addIterator(iter2).addIterator(iter3).build());
+ opts.addIterator(iter1).addIterator(iter2).addIterator(iter3).store(job);
List<IteratorSetting> list = InputConfigurator.getIterators(AccumuloInputFormat.class, job);
@@ -143,9 +142,9 @@ public class AccumuloInputFormatTest {
IteratorSetting iter1 = new IteratorSetting(1, "iter1", WholeRowIterator.class);
iter1.addOption(key, value);
// also test if reusing options will create duplicate iterators
- InputInfo.InputInfoBuilder.InputFormatOptions opts = InputInfo.builder().clientInfo(clientInfo)
- .table("test").scanAuths(Authorizations.EMPTY);
- AccumuloInputFormat.setInfo(job, opts.addIterator(iter1).build());
+ InputFormatOptions<JobConf> opts = AccumuloInputFormat.configure().clientInfo(clientInfo)
+ .table("test").auths(Authorizations.EMPTY);
+ opts.addIterator(iter1).store(job);
List<IteratorSetting> list = InputConfigurator.getIterators(AccumuloInputFormat.class, job);
assertEquals(1, list.size());
@@ -155,7 +154,7 @@ public class AccumuloInputFormatTest {
IteratorSetting iter2 = new IteratorSetting(1, "iter2", WholeRowIterator.class);
iter2.addOption(key, value);
iter2.addOption(key + "2", value);
- AccumuloInputFormat.setInfo(job, opts.addIterator(iter1).addIterator(iter2).build());
+ opts.addIterator(iter1).addIterator(iter2).store(job);
list = InputConfigurator.getIterators(AccumuloInputFormat.class, job);
assertEquals(2, list.size());
assertEquals(1, list.get(0).getOptions().size());
@@ -173,9 +172,8 @@ public class AccumuloInputFormatTest {
IteratorSetting iter1 = new IteratorSetting(1, "WholeRow", WholeRowIterator.class.getName());
IteratorSetting iter2 = new IteratorSetting(2, "Versions", VersioningIterator.class.getName());
IteratorSetting iter3 = new IteratorSetting(3, "Count", CountingIterator.class.getName());
- AccumuloInputFormat.setInfo(job,
- InputInfo.builder().clientInfo(clientInfo).table("test").scanAuths(Authorizations.EMPTY)
- .addIterator(iter1).addIterator(iter2).addIterator(iter3).build());
+ AccumuloInputFormat.configure().clientInfo(clientInfo).table("test").auths(Authorizations.EMPTY)
+ .addIterator(iter1).addIterator(iter2).addIterator(iter3).store(job);
List<IteratorSetting> list = InputConfigurator.getIterators(AccumuloInputFormat.class, job);
@@ -206,8 +204,8 @@ public class AccumuloInputFormatTest {
IteratorSetting is = new IteratorSetting(50, regex, RegExFilter.class);
RegExFilter.setRegexs(is, regex, null, null, null, false);
- AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(clientInfo).table("test")
- .scanAuths(Authorizations.EMPTY).addIterator(is).build());
+ AccumuloInputFormat.configure().clientInfo(clientInfo).table("test").auths(Authorizations.EMPTY)
+ .addIterator(is).store(job);
assertEquals(regex,
InputConfigurator.getIterators(AccumuloInputFormat.class, job).get(0).getName());
@@ -221,8 +219,8 @@ public class AccumuloInputFormatTest {
cols.add(new IteratorSetting.Column(new Text(""), new Text("bar")));
cols.add(new IteratorSetting.Column(new Text(""), new Text("")));
cols.add(new IteratorSetting.Column(new Text("foo"), new Text("")));
- AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(clientInfo).table("test")
- .scanAuths(Authorizations.EMPTY).fetchColumns(cols).build());
+ AccumuloInputFormat.configure().clientInfo(clientInfo).table("test").auths(Authorizations.EMPTY)
+ .fetchColumns(cols).store(job);
assertEquals(cols, InputConfigurator.getFetchedColumns(AccumuloInputFormat.class, job));
}
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormatTest.java
index 5811ebc..0f9f777 100644
--- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormatTest.java
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormatTest.java
@@ -16,21 +16,16 @@
*/
package org.apache.accumulo.hadoop.mapred;
-import static org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl.getBatchWriterOptions;
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.replay;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import java.io.IOException;
-import java.util.Properties;
import java.util.concurrent.TimeUnit;
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.ClientInfo;
-import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
-import org.apache.accumulo.hadoop.mapreduce.OutputInfo;
+import org.apache.accumulo.hadoopImpl.mapreduce.lib.OutputConfigurator;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapred.JobConf;
import org.junit.Test;
@@ -38,14 +33,11 @@ import org.junit.Test;
public class AccumuloOutputFormatTest {
@Test
public void testBWSettings() throws IOException {
- ClientInfo clientInfo = createMock(ClientInfo.class);
- AuthenticationToken token = createMock(AuthenticationToken.class);
- Properties props = createMock(Properties.class);
- expect(clientInfo.getAuthenticationToken()).andReturn(token).anyTimes();
- expect(clientInfo.getProperties()).andReturn(props).anyTimes();
- replay(clientInfo);
JobConf job = new JobConf();
+ AccumuloClient.ConnectionOptions opts = Accumulo.newClient().to("test", "zk").as("blah",
+ "blah");
+
// make sure we aren't testing defaults
final BatchWriterConfig bwDefaults = new BatchWriterConfig();
assertNotEquals(7654321L, bwDefaults.getMaxLatency(TimeUnit.MILLISECONDS));
@@ -58,13 +50,14 @@ public class AccumuloOutputFormatTest {
bwConfig.setTimeout(9898989L, TimeUnit.MILLISECONDS);
bwConfig.setMaxWriteThreads(42);
bwConfig.setMaxMemory(1123581321L);
- AccumuloOutputFormat.setInfo(job,
- OutputInfo.builder().clientInfo(clientInfo).batchWriterOptions(bwConfig).build());
+ opts.batchWriterConfig(bwConfig);
+ AccumuloOutputFormat.configure().clientInfo(opts.info()).store(job);
AccumuloOutputFormat myAOF = new AccumuloOutputFormat() {
@Override
public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
- BatchWriterConfig bwOpts = getBatchWriterOptions(job);
+ BatchWriterConfig bwOpts = OutputConfigurator
+ .getBatchWriterOptions(AccumuloOutputFormat.class, job);
// passive check
assertEquals(bwConfig.getMaxLatency(TimeUnit.MILLISECONDS),
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloFileOutputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloFileOutputFormatTest.java
index ea74826..7aacc14 100644
--- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloFileOutputFormatTest.java
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloFileOutputFormatTest.java
@@ -56,10 +56,9 @@ public class AccumuloFileOutputFormatTest {
.addOption(CountingSummarizer.MAX_COUNTERS_OPT, 256).build();
Job job1 = Job.getInstance();
- AccumuloFileOutputFormat.setInfo(job1,
- FileOutputInfo.builder().outputPath(new Path("somewhere")).replication(a).fileBlockSize(b)
- .dataBlockSize(c).indexBlockSize(d).compressionType(e).sampler(samplerConfig)
- .summarizers(sc1, sc2).build());
+ AccumuloFileOutputFormat.configure().outputPath(new Path("somewhere")).replication(a)
+ .fileBlockSize(b).dataBlockSize(c).indexBlockSize(d).compression(e).sampler(samplerConfig)
+ .summarizers(sc1, sc2).store(job1);
AccumuloConfiguration acuconf = FileOutputConfigurator
.getAccumuloConfiguration(AccumuloFileOutputFormat.class, job1.getConfiguration());
@@ -88,9 +87,9 @@ public class AccumuloFileOutputFormatTest {
samplerConfig.addOption("modulus", "100003");
Job job2 = Job.getInstance();
- AccumuloFileOutputFormat.setInfo(job2,
- FileOutputInfo.builder().outputPath(new Path("somewhere")).replication(a).fileBlockSize(b)
- .dataBlockSize(c).indexBlockSize(d).compressionType(e).sampler(samplerConfig).build());
+ AccumuloFileOutputFormat.configure().outputPath(new Path("somewhere")).replication(a)
+ .fileBlockSize(b).dataBlockSize(c).indexBlockSize(d).compression(e).sampler(samplerConfig)
+ .store(job2);
acuconf = FileOutputConfigurator.getAccumuloConfiguration(AccumuloFileOutputFormat.class,
job2.getConfiguration());
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormatTest.java
index 4de275b..dfac630 100644
--- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormatTest.java
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormatTest.java
@@ -38,6 +38,7 @@ import org.apache.accumulo.core.iterators.user.RegExFilter;
import org.apache.accumulo.core.iterators.user.VersioningIterator;
import org.apache.accumulo.core.iterators.user.WholeRowIterator;
import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.hadoop.mapreduce.InputFormatBuilder.InputFormatOptions;
import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
@@ -64,11 +65,10 @@ public class AccumuloInputFormatTest {
@Test
public void testSetIterator() throws IOException {
Job job = Job.getInstance();
- InputInfo.InputInfoBuilder.InputFormatOptions opts = InputInfo.builder().clientInfo(clientInfo)
- .table("test").scanAuths(Authorizations.EMPTY);
IteratorSetting is = new IteratorSetting(1, "WholeRow", WholeRowIterator.class);
- AccumuloInputFormat.setInfo(job, opts.addIterator(is).build());
+ AccumuloInputFormat.configure().clientInfo(clientInfo).table("test").auths(Authorizations.EMPTY)
+ .addIterator(is).store(job);
Configuration conf = job.getConfiguration();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
is.write(new DataOutputStream(baos));
@@ -79,16 +79,14 @@ public class AccumuloInputFormatTest {
@Test
public void testAddIterator() throws IOException {
Job job = Job.getInstance();
- InputInfo.InputInfoBuilder.InputFormatOptions opts = InputInfo.builder().clientInfo(clientInfo)
- .table("test").scanAuths(Authorizations.EMPTY);
IteratorSetting iter1 = new IteratorSetting(1, "WholeRow", WholeRowIterator.class);
IteratorSetting iter2 = new IteratorSetting(2, "Versions", VersioningIterator.class);
IteratorSetting iter3 = new IteratorSetting(3, "Count", CountingIterator.class);
iter3.addOption("v1", "1");
iter3.addOption("junk", "\0omg:!\\xyzzy");
- AccumuloInputFormat.setInfo(job,
- opts.addIterator(iter1).addIterator(iter2).addIterator(iter3).build());
+ AccumuloInputFormat.configure().clientInfo(clientInfo).table("test").auths(Authorizations.EMPTY)
+ .addIterator(iter1).addIterator(iter2).addIterator(iter3).store(job);
List<IteratorSetting> list = InputConfigurator.getIterators(AccumuloInputFormat.class,
job.getConfiguration());
@@ -134,9 +132,9 @@ public class AccumuloInputFormatTest {
iter1.addOption(key, value);
Job job = Job.getInstance();
// also test if reusing options will create duplicate iterators
- InputInfo.InputInfoBuilder.InputFormatOptions opts = InputInfo.builder().clientInfo(clientInfo)
- .table("test").scanAuths(Authorizations.EMPTY);
- AccumuloInputFormat.setInfo(job, opts.addIterator(iter1).build());
+ InputFormatOptions<Job> opts = AccumuloInputFormat.configure().clientInfo(clientInfo)
+ .table("test").auths(Authorizations.EMPTY);
+ opts.addIterator(iter1).store(job);
List<IteratorSetting> list = InputConfigurator.getIterators(AccumuloInputFormat.class,
job.getConfiguration());
@@ -147,7 +145,7 @@ public class AccumuloInputFormatTest {
IteratorSetting iter2 = new IteratorSetting(1, "iter2", WholeRowIterator.class);
iter2.addOption(key, value);
iter2.addOption(key + "2", value);
- AccumuloInputFormat.setInfo(job, opts.addIterator(iter1).addIterator(iter2).build());
+ opts.addIterator(iter1).addIterator(iter2).store(job);
list = InputConfigurator.getIterators(AccumuloInputFormat.class, job.getConfiguration());
assertEquals(2, list.size());
assertEquals(1, list.get(0).getOptions().size());
@@ -167,9 +165,8 @@ public class AccumuloInputFormatTest {
IteratorSetting iter1 = new IteratorSetting(1, "WholeRow", WholeRowIterator.class.getName());
IteratorSetting iter2 = new IteratorSetting(2, "Versions", VersioningIterator.class.getName());
IteratorSetting iter3 = new IteratorSetting(3, "Count", CountingIterator.class.getName());
- AccumuloInputFormat.setInfo(job,
- InputInfo.builder().clientInfo(clientInfo).table("test").scanAuths(Authorizations.EMPTY)
- .addIterator(iter1).addIterator(iter2).addIterator(iter3).build());
+ AccumuloInputFormat.configure().clientInfo(clientInfo).table("test").auths(Authorizations.EMPTY)
+ .addIterator(iter1).addIterator(iter2).addIterator(iter3).store(job);
List<IteratorSetting> list = InputConfigurator.getIterators(AccumuloInputFormat.class,
job.getConfiguration());
@@ -203,8 +200,8 @@ public class AccumuloInputFormatTest {
IteratorSetting is = new IteratorSetting(50, regex, RegExFilter.class);
RegExFilter.setRegexs(is, regex, null, null, null, false);
- AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(clientInfo).table("test")
- .scanAuths(Authorizations.EMPTY).addIterator(is).build());
+ AccumuloInputFormat.configure().clientInfo(clientInfo).table("test").auths(Authorizations.EMPTY)
+ .addIterator(is).store(job);
assertEquals(regex, InputConfigurator
.getIterators(AccumuloInputFormat.class, job.getConfiguration()).get(0).getName());
@@ -219,8 +216,8 @@ public class AccumuloInputFormatTest {
cols.add(new IteratorSetting.Column(new Text(""), new Text("bar")));
cols.add(new IteratorSetting.Column(new Text(""), new Text("")));
cols.add(new IteratorSetting.Column(new Text("foo"), new Text("")));
- AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(clientInfo).table("test")
- .scanAuths(Authorizations.EMPTY).fetchColumns(cols).build());
+ AccumuloInputFormat.configure().clientInfo(clientInfo).table("test").auths(Authorizations.EMPTY)
+ .fetchColumns(cols).store(job);
assertEquals(cols,
InputConfigurator.getFetchedColumns(AccumuloInputFormat.class, job.getConfiguration()));
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormatTest.java
index e698b3a..52aace9 100644
--- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormatTest.java
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormatTest.java
@@ -16,20 +16,16 @@
*/
package org.apache.accumulo.hadoop.mapreduce;
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.replay;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import java.io.IOException;
-import java.util.Properties;
import java.util.concurrent.TimeUnit;
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.ClientInfo;
-import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
-import org.apache.accumulo.hadoopImpl.mapreduce.AccumuloOutputFormatImpl;
+import org.apache.accumulo.hadoopImpl.mapreduce.lib.OutputConfigurator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.junit.Test;
@@ -38,14 +34,11 @@ public class AccumuloOutputFormatTest {
@Test
public void testBWSettings() throws IOException {
- ClientInfo clientInfo = createMock(ClientInfo.class);
- AuthenticationToken token = createMock(AuthenticationToken.class);
- Properties props = createMock(Properties.class);
- expect(clientInfo.getAuthenticationToken()).andReturn(token).anyTimes();
- expect(clientInfo.getProperties()).andReturn(props).anyTimes();
- replay(clientInfo);
Job job = Job.getInstance();
+ AccumuloClient.ConnectionOptions opts = Accumulo.newClient().to("test", "zk").as("blah",
+ "blah");
+
// make sure we aren't testing defaults
final BatchWriterConfig bwDefaults = new BatchWriterConfig();
assertNotEquals(7654321L, bwDefaults.getMaxLatency(TimeUnit.MILLISECONDS));
@@ -58,13 +51,14 @@ public class AccumuloOutputFormatTest {
bwConfig.setTimeout(9898989L, TimeUnit.MILLISECONDS);
bwConfig.setMaxWriteThreads(42);
bwConfig.setMaxMemory(1123581321L);
- AccumuloOutputFormat.setInfo(job,
- OutputInfo.builder().clientInfo(clientInfo).batchWriterOptions(bwConfig).build());
+ opts.batchWriterConfig(bwConfig);
+ AccumuloOutputFormat.configure().clientInfo(opts.info()).store(job);
AccumuloOutputFormat myAOF = new AccumuloOutputFormat() {
@Override
public void checkOutputSpecs(JobContext job) throws IOException {
- BatchWriterConfig bwOpts = AccumuloOutputFormatImpl.getBatchWriterOptions(job);
+ BatchWriterConfig bwOpts = OutputConfigurator
+ .getBatchWriterOptions(AccumuloOutputFormat.class, job.getConfiguration());
// passive check
assertEquals(bwConfig.getMaxLatency(TimeUnit.MILLISECONDS),
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ConfigurableMacBase.java b/test/src/main/java/org/apache/accumulo/test/functional/ConfigurableMacBase.java
index de722ef..ba0fb85 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ConfigurableMacBase.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ConfigurableMacBase.java
@@ -29,6 +29,7 @@ import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.ClientInfo;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.clientImpl.ClientContext;
@@ -205,6 +206,11 @@ public class ConfigurableMacBase extends AccumuloITBase {
.as("root", ROOT_PASSWORD).info();
}
+ protected ClientInfo getClientInfo(BatchWriterConfig bwConfig) {
+ return Accumulo.newClient().to(getCluster().getInstanceName(), getCluster().getZooKeepers())
+ .as("root", ROOT_PASSWORD).batchWriterConfig(bwConfig).info();
+ }
+
protected ServerContext getServerContext() {
return getCluster().getServerContext();
}