You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2018/12/19 20:01:18 UTC
[accumulo] branch master updated: Multi table input for new MR.
Closes #749 (#821)
This is an automated email from the ASF dual-hosted git repository.
mmiller pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push:
new 14e0bd6 Multi table input for new MR. Closes #749 (#821)
14e0bd6 is described below
commit 14e0bd6f671ad12eada7a62aaa49243ca7eb000d
Author: Mike Miller <mm...@apache.org>
AuthorDate: Wed Dec 19 15:01:13 2018 -0500
Multi table input for new MR. Closes #749 (#821)
* Modified new MapReduce builder and implementation to allow multiple
tables through the same fluent API as AccumuloInputFormat
* Replaced setIterators method with addIterator
---
.../hadoop/mapred/AccumuloInputFormat.java | 4 +-
.../hadoop/mapred/AccumuloRowInputFormat.java | 2 +-
.../hadoop/mapreduce/AccumuloInputFormat.java | 11 ++
.../hadoop/mapreduce/InputFormatBuilder.java | 15 +-
.../hadoopImpl/mapred/AbstractInputFormat.java | 41 +----
.../hadoopImpl/mapreduce/AbstractInputFormat.java | 35 +---
.../mapreduce/InputFormatBuilderImpl.java | 198 +++++++++++----------
.../hadoopImpl/mapreduce/InputTableConfig.java | 98 +++++-----
.../mapreduce/lib/InputConfigurator.java | 2 +-
.../hadoop/its/mapred/MultiTableInputFormatIT.java | 152 ++++++++++++++++
.../its/mapreduce/MultiTableInputFormatIT.java | 152 ++++++++++++++++
.../hadoop/mapred/AccumuloInputFormatTest.java | 11 ++
.../hadoop/mapred/MultiTableInputFormatTest.java | 123 +++++++++++++
.../hadoop/mapreduce/AccumuloInputFormatTest.java | 13 ++
.../mapreduce/MultiTableInputFormatTest.java | 125 +++++++++++++
.../hadoopImpl/mapreduce/InputTableConfigTest.java | 2 +-
16 files changed, 769 insertions(+), 215 deletions(-)
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormat.java
index 6fabcf9..2523a0d 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormat.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormat.java
@@ -54,7 +54,7 @@ public class AccumuloInputFormat implements InputFormat<Key,Value> {
*/
@Override
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
- return AbstractInputFormat.getSplits(job, numSplits);
+ return AbstractInputFormat.getSplits(job);
}
@Override
@@ -96,6 +96,6 @@ public class AccumuloInputFormat implements InputFormat<Key,Value> {
* Sets all the information required for this map reduce job.
*/
public static InputFormatBuilder.ClientParams<JobConf> configure() {
- return new InputFormatBuilderImpl<JobConf>(CLASS);
+ return new InputFormatBuilderImpl<>(CLASS);
}
}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloRowInputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloRowInputFormat.java
index bdcfbdb..cb1d650 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloRowInputFormat.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloRowInputFormat.java
@@ -53,7 +53,7 @@ public class AccumuloRowInputFormat implements InputFormat<Text,PeekingIterator<
*/
@Override
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
- return AbstractInputFormat.getSplits(job, numSplits);
+ return AbstractInputFormat.getSplits(job);
}
@Override
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormat.java
index 2bffe35..2c6259d 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormat.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormat.java
@@ -49,6 +49,17 @@ import org.slf4j.LoggerFactory;
* .store(job);
* </pre>
*
+ * Multiple tables can be set by configuring clientProperties once and then calling .table() for
+ * each table. The methods following a call to .table() apply only to that table. For Example:
+ *
+ * <pre>
+ * AccumuloInputFormat.configure().clientProperties(props) // set client props once
+ * .table(table1).auths(auths1).fetchColumns(cols1).batchScan(true) // options for table1
+ * .table(table2).ranges(range2).auths(auths2).addIterator(iter2) // options for table2
+ * .table(table3).ranges(range3).auths(auths3).addIterator(iter3) // options for table3
+ * .store(job); // store all tables in the job when finished
+ * </pre>
+ *
* For descriptions of all options see
* {@link org.apache.accumulo.hadoop.mapreduce.InputFormatBuilder.InputFormatOptions}
*
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/InputFormatBuilder.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/InputFormatBuilder.java
index 4677bda..ddb4deb 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/InputFormatBuilder.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/InputFormatBuilder.java
@@ -62,12 +62,18 @@ public interface InputFormatBuilder {
*/
interface TableParams<T> {
/**
- * Sets the name of the input table, over which this job will scan.
+ * Sets the name of the input table, over which this job will scan. At least one table is
+ * required before calling store(Job)
*
* @param tableName
* the table to use when the tablename is null in the write call
*/
InputFormatOptions<T> table(String tableName);
+
+ /**
+ * Finish configuring, verify and serialize options into the JobConf or Job
+ */
+ void store(T j) throws AccumuloException, AccumuloSecurityException;
}
/**
@@ -75,7 +81,7 @@ public interface InputFormatBuilder {
*
* @since 2.0
*/
- interface InputFormatOptions<T> {
+ interface InputFormatOptions<T> extends TableParams<T> {
/**
* Sets the {@link Authorizations} used to scan. Must be a subset of the user's authorizations.
* By Default, all of the users auths are set.
@@ -217,10 +223,5 @@ public interface InputFormatBuilder {
* By default, this feature is <b>disabled</b>.
*/
InputFormatOptions<T> batchScan(boolean value);
-
- /**
- * Finish configuring, verify and serialize options into the JobConf or Job
- */
- void store(T j) throws AccumuloException, AccumuloSecurityException;
}
}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AbstractInputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AbstractInputFormat.java
index fc34f3f..4d1d4ea 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AbstractInputFormat.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AbstractInputFormat.java
@@ -201,35 +201,6 @@ public abstract class AbstractInputFormat {
}
/**
- * Fetches all {@link InputTableConfig}s that have been set on the given Hadoop job.
- *
- * @param job
- * the Hadoop job instance to be configured
- * @return the {@link InputTableConfig} objects set on the job
- * @since 1.6.0
- */
- protected static Map<String,InputTableConfig> getInputTableConfigs(JobConf job) {
- return InputConfigurator.getInputTableConfigs(CLASS, job);
- }
-
- /**
- * Fetches a {@link InputTableConfig} that has been set on the configuration for a specific table.
- *
- * <p>
- * null is returned in the event that the table doesn't exist.
- *
- * @param job
- * the Hadoop job instance to be configured
- * @param tableName
- * the table name for which to grab the config object
- * @return the {@link InputTableConfig} for the given table
- * @since 1.6.0
- */
- protected static InputTableConfig getInputTableConfig(JobConf job, String tableName) {
- return InputConfigurator.getInputTableConfig(CLASS, job, tableName);
- }
-
- /**
* An abstract base class to be used to create {@link org.apache.hadoop.mapred.RecordReader}
* instances that convert from Accumulo
* {@link org.apache.accumulo.core.data.Key}/{@link org.apache.accumulo.core.data.Value} pairs to
@@ -305,7 +276,8 @@ public abstract class AbstractInputFormat {
// in case the table name changed, we can still use the previous name for terms of
// configuration, but the scanner will use the table id resolved at job setup time
- InputTableConfig tableConfig = getInputTableConfig(job, baseSplit.getTableName());
+ InputTableConfig tableConfig = InputConfigurator.getInputTableConfig(CLASS, job,
+ baseSplit.getTableName());
log.debug("Created client with user: " + context.whoami());
log.debug("Creating scanner for table: " + table);
@@ -452,18 +424,13 @@ public abstract class AbstractInputFormat {
/**
* Gets the splits of the tables that have been set on the job by reading the metadata table for
* the specified ranges.
- *
- * @return the splits from the tables based on the ranges.
- * @throws java.io.IOException
- * if a table set on the job doesn't exist or an error occurs initializing the tablet
- * locator
*/
- public static InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ public static InputSplit[] getSplits(JobConf job) throws IOException {
validateOptions(job);
Random random = new SecureRandom();
LinkedList<InputSplit> splits = new LinkedList<>();
- Map<String,InputTableConfig> tableConfigs = getInputTableConfigs(job);
+ Map<String,InputTableConfig> tableConfigs = InputConfigurator.getInputTableConfigs(CLASS, job);
try (AccumuloClient client = createClient(job)) {
for (Map.Entry<String,InputTableConfig> tableConfigEntry : tableConfigs.entrySet()) {
String tableName = tableConfigEntry.getKey();
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AbstractInputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AbstractInputFormat.java
index c6702b6..b845ff8 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AbstractInputFormat.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AbstractInputFormat.java
@@ -182,35 +182,6 @@ public abstract class AbstractInputFormat {
return InputConfigurator.getScanAuthorizations(CLASS, context.getConfiguration());
}
- /**
- * Fetches all {@link InputTableConfig}s that have been set on the given job.
- *
- * @param context
- * the Hadoop job instance to be configured
- * @return the {@link InputTableConfig} objects for the job
- * @since 1.6.0
- */
- public static Map<String,InputTableConfig> getInputTableConfigs(JobContext context) {
- return InputConfigurator.getInputTableConfigs(CLASS, context.getConfiguration());
- }
-
- /**
- * Fetches a {@link InputTableConfig} that has been set on the configuration for a specific table.
- *
- * <p>
- * null is returned in the event that the table doesn't exist.
- *
- * @param context
- * the Hadoop job instance to be configured
- * @param tableName
- * the table name for which to grab the config object
- * @return the {@link InputTableConfig} for the given table
- * @since 1.6.0
- */
- protected static InputTableConfig getInputTableConfig(JobContext context, String tableName) {
- return InputConfigurator.getInputTableConfig(CLASS, context.getConfiguration(), tableName);
- }
-
// InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job)
/**
* Check whether a configuration is fully configured to be used with an Accumulo
@@ -310,7 +281,8 @@ public abstract class AbstractInputFormat {
// in case the table name changed, we can still use the previous name for terms of
// configuration,
// but the scanner will use the table id resolved at job setup time
- InputTableConfig tableConfig = getInputTableConfig(attempt, split.getTableName());
+ InputTableConfig tableConfig = InputConfigurator.getInputTableConfig(CLASS,
+ attempt.getConfiguration(), split.getTableName());
log.debug("Creating client with user: " + client.whoami());
log.debug("Creating scanner for table: " + table);
@@ -476,7 +448,8 @@ public abstract class AbstractInputFormat {
Random random = new SecureRandom();
LinkedList<InputSplit> splits = new LinkedList<>();
try (AccumuloClient client = createClient(context)) {
- Map<String,InputTableConfig> tableConfigs = getInputTableConfigs(context);
+ Map<String,InputTableConfig> tableConfigs = InputConfigurator.getInputTableConfigs(CLASS,
+ context.getConfiguration());
for (Map.Entry<String,InputTableConfig> tableConfigEntry : tableConfigs.entrySet()) {
String tableName = tableConfigEntry.getKey();
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputFormatBuilderImpl.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputFormatBuilderImpl.java
index 3d85205..0af63d8 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputFormatBuilderImpl.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputFormatBuilderImpl.java
@@ -19,9 +19,9 @@ package org.apache.accumulo.hadoopImpl.mapreduce;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Optional;
import java.util.Properties;
import org.apache.accumulo.core.client.Accumulo;
@@ -46,17 +46,10 @@ public class InputFormatBuilderImpl<T>
InputFormatBuilder.TableParams<T>, InputFormatBuilder.InputFormatOptions<T> {
Class<?> callingClass;
- String tableName;
ClientInfo clientInfo;
- Authorizations scanAuths;
- Optional<String> context = Optional.empty();
- Collection<Range> ranges = Collections.emptyList();
- Collection<IteratorSetting.Column> fetchColumns = Collections.emptyList();
- Map<String,IteratorSetting> iterators = Collections.emptyMap();
- Optional<SamplerConfiguration> samplerConfig = Optional.empty();
- Map<String,String> hints = Collections.emptyMap();
- BuilderBooleans bools = new BuilderBooleans();
+ String currentTable;
+ Map<String,InputTableConfig> tableConfigMap = Collections.emptyMap();
public InputFormatBuilderImpl(Class<?> callingClass) {
this.callingClass = callingClass;
@@ -71,38 +64,44 @@ public class InputFormatBuilderImpl<T>
@Override
public InputFormatBuilder.InputFormatOptions<T> table(String tableName) {
- this.tableName = Objects.requireNonNull(tableName, "Table name must not be null");
+ this.currentTable = Objects.requireNonNull(tableName, "Table name must not be null");
+ if (tableConfigMap.isEmpty())
+ tableConfigMap = new LinkedHashMap<>();
+ tableConfigMap.put(currentTable, new InputTableConfig());
return this;
}
@Override
public InputFormatBuilder.InputFormatOptions<T> auths(Authorizations auths) {
- this.scanAuths = Objects.requireNonNull(auths, "Authorizations must not be null");
+ tableConfigMap.get(currentTable)
+ .setScanAuths(Objects.requireNonNull(auths, "Authorizations must not be null"));
return this;
}
@Override
public InputFormatBuilder.InputFormatOptions<T> classLoaderContext(String context) {
- this.context = Optional.of(context);
+ tableConfigMap.get(currentTable).setContext(context);
return this;
}
@Override
public InputFormatBuilder.InputFormatOptions<T> ranges(Collection<Range> ranges) {
- this.ranges = ImmutableList
+ List<Range> newRanges = ImmutableList
.copyOf(Objects.requireNonNull(ranges, "Collection of ranges is null"));
- if (this.ranges.size() == 0)
+ if (newRanges.size() == 0)
throw new IllegalArgumentException("Specified collection of ranges is empty.");
+ tableConfigMap.get(currentTable).setRanges(newRanges);
return this;
}
@Override
public InputFormatBuilder.InputFormatOptions<T> fetchColumns(
Collection<IteratorSetting.Column> fetchColumns) {
- this.fetchColumns = ImmutableList
+ Collection<IteratorSetting.Column> newFetchColumns = ImmutableList
.copyOf(Objects.requireNonNull(fetchColumns, "Collection of fetch columns is null"));
- if (this.fetchColumns.size() == 0)
+ if (newFetchColumns.size() == 0)
throw new IllegalArgumentException("Specified collection of fetch columns is empty.");
+ tableConfigMap.get(currentTable).fetchColumns(newFetchColumns);
return this;
}
@@ -110,57 +109,56 @@ public class InputFormatBuilderImpl<T>
public InputFormatBuilder.InputFormatOptions<T> addIterator(IteratorSetting cfg) {
// store iterators by name to prevent duplicates
Objects.requireNonNull(cfg, "IteratorSetting must not be null.");
- if (this.iterators.size() == 0)
- this.iterators = new LinkedHashMap<>();
- this.iterators.put(cfg.getName(), cfg);
+ tableConfigMap.get(currentTable).addIterator(cfg);
return this;
}
@Override
public InputFormatBuilder.InputFormatOptions<T> executionHints(Map<String,String> hints) {
- this.hints = ImmutableMap
+ Map<String,String> newHints = ImmutableMap
.copyOf(Objects.requireNonNull(hints, "Map of execution hints must not be null."));
- if (hints.size() == 0)
+ if (newHints.size() == 0)
throw new IllegalArgumentException("Specified map of execution hints is empty.");
+ tableConfigMap.get(currentTable).setExecutionHints(newHints);
return this;
}
@Override
public InputFormatBuilder.InputFormatOptions<T> samplerConfiguration(
SamplerConfiguration samplerConfig) {
- this.samplerConfig = Optional.of(samplerConfig);
+ tableConfigMap.get(currentTable).setSamplerConfiguration(samplerConfig);
return this;
}
@Override
public InputFormatOptions<T> autoAdjustRanges(boolean value) {
- bools.autoAdjustRanges = value;
+ tableConfigMap.get(currentTable).setAutoAdjustRanges(value);
return this;
}
@Override
public InputFormatOptions<T> scanIsolation(boolean value) {
- bools.scanIsolation = value;
+ tableConfigMap.get(currentTable).setUseIsolatedScanners(value);
return this;
}
@Override
public InputFormatOptions<T> localIterators(boolean value) {
- bools.localIters = value;
+ tableConfigMap.get(currentTable).setUseLocalIterators(value);
return this;
}
@Override
public InputFormatOptions<T> offlineScan(boolean value) {
- bools.offlineScan = value;
+ tableConfigMap.get(currentTable).setOfflineScan(value);
return this;
}
@Override
public InputFormatOptions<T> batchScan(boolean value) {
- bools.batchScan = value;
+ tableConfigMap.get(currentTable).setUseBatchScan(value);
if (value)
- bools.autoAdjustRanges = true;
+ tableConfigMap.get(currentTable).setAutoAdjustRanges(true);
return this;
}
@@ -180,30 +178,40 @@ public class InputFormatBuilderImpl<T>
*/
private void store(Job job) throws AccumuloException, AccumuloSecurityException {
AbstractInputFormat.setClientInfo(job, clientInfo);
- InputFormatBase.setInputTableName(job, tableName);
-
- scanAuths = getUserAuths(scanAuths, clientInfo);
- AbstractInputFormat.setScanAuthorizations(job, scanAuths);
-
- // all optional values
- if (context.isPresent())
- AbstractInputFormat.setClassLoaderContext(job, context.get());
- if (ranges.size() > 0)
- InputFormatBase.setRanges(job, ranges);
- if (iterators.size() > 0)
- InputConfigurator.writeIteratorsToConf(callingClass, job.getConfiguration(),
- iterators.values());
- if (fetchColumns.size() > 0)
- InputConfigurator.fetchColumns(callingClass, job.getConfiguration(), fetchColumns);
- if (samplerConfig.isPresent())
- InputFormatBase.setSamplerConfiguration(job, samplerConfig.get());
- if (hints.size() > 0)
- InputFormatBase.setExecutionHints(job, hints);
- InputFormatBase.setAutoAdjustRanges(job, bools.autoAdjustRanges);
- InputFormatBase.setScanIsolation(job, bools.scanIsolation);
- InputFormatBase.setLocalIterators(job, bools.localIters);
- InputFormatBase.setOfflineTableScan(job, bools.offlineScan);
- InputFormatBase.setBatchScan(job, bools.batchScan);
+ if (tableConfigMap.size() == 0) {
+ throw new IllegalArgumentException("At least one Table must be configured for job.");
+ }
+ // if only one table use the single table configuration method
+ if (tableConfigMap.size() == 1) {
+ Map.Entry<String,InputTableConfig> entry = tableConfigMap.entrySet().iterator().next();
+ InputFormatBase.setInputTableName(job, entry.getKey());
+ InputTableConfig config = entry.getValue();
+ if (!config.getScanAuths().isPresent())
+ config.setScanAuths(getUserAuths(clientInfo));
+ AbstractInputFormat.setScanAuthorizations(job, config.getScanAuths().get());
+ // all optional values
+ if (config.getContext().isPresent())
+ AbstractInputFormat.setClassLoaderContext(job, config.getContext().get());
+ if (config.getRanges().size() > 0)
+ InputFormatBase.setRanges(job, config.getRanges());
+ if (config.getIterators().size() > 0)
+ InputConfigurator.writeIteratorsToConf(callingClass, job.getConfiguration(),
+ config.getIterators());
+ if (config.getFetchedColumns().size() > 0)
+ InputConfigurator.fetchColumns(callingClass, job.getConfiguration(),
+ config.getFetchedColumns());
+ if (config.getSamplerConfiguration() != null)
+ InputFormatBase.setSamplerConfiguration(job, config.getSamplerConfiguration());
+ if (config.getExecutionHints().size() > 0)
+ InputFormatBase.setExecutionHints(job, config.getExecutionHints());
+ InputFormatBase.setAutoAdjustRanges(job, config.shouldAutoAdjustRanges());
+ InputFormatBase.setScanIsolation(job, config.shouldUseIsolatedScanners());
+ InputFormatBase.setLocalIterators(job, config.shouldUseLocalIterators());
+ InputFormatBase.setOfflineTableScan(job, config.isOfflineScan());
+ InputFormatBase.setBatchScan(job, config.shouldBatchScan());
+ } else {
+ InputConfigurator.setInputTableConfigs(callingClass, job.getConfiguration(), tableConfigMap);
+ }
}
/**
@@ -211,52 +219,56 @@ public class InputFormatBuilderImpl<T>
*/
private void store(JobConf jobConf) throws AccumuloException, AccumuloSecurityException {
org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setClientInfo(jobConf, clientInfo);
- org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setInputTableName(jobConf, tableName);
-
- scanAuths = getUserAuths(scanAuths, clientInfo);
- org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setScanAuthorizations(jobConf,
- scanAuths);
-
- // all optional values
- if (context.isPresent())
- org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setClassLoaderContext(jobConf,
- context.get());
- if (ranges.size() > 0)
- org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setRanges(jobConf, ranges);
- if (iterators.size() > 0)
- InputConfigurator.writeIteratorsToConf(callingClass, jobConf, iterators.values());
- if (fetchColumns.size() > 0)
- InputConfigurator.fetchColumns(callingClass, jobConf, fetchColumns);
- if (samplerConfig.isPresent())
- org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setSamplerConfiguration(jobConf,
- samplerConfig.get());
- if (hints.size() > 0)
- org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setExecutionHints(jobConf, hints);
- org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setAutoAdjustRanges(jobConf,
- bools.autoAdjustRanges);
- org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setScanIsolation(jobConf,
- bools.scanIsolation);
- org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setLocalIterators(jobConf,
- bools.localIters);
- org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setOfflineTableScan(jobConf,
- bools.offlineScan);
- org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setBatchScan(jobConf, bools.batchScan);
+ if (tableConfigMap.size() == 0) {
+ throw new IllegalArgumentException("At least one Table must be configured for job.");
+ }
+ // if only one table use the single table configuration method
+ if (tableConfigMap.size() == 1) {
+ Map.Entry<String,InputTableConfig> entry = tableConfigMap.entrySet().iterator().next();
+ org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setInputTableName(jobConf,
+ entry.getKey());
+ InputTableConfig config = entry.getValue();
+ if (!config.getScanAuths().isPresent())
+ config.setScanAuths(getUserAuths(clientInfo));
+ org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setScanAuthorizations(jobConf,
+ config.getScanAuths().get());
+ // all optional values
+ if (config.getContext().isPresent())
+ org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setClassLoaderContext(jobConf,
+ config.getContext().get());
+ if (config.getRanges().size() > 0)
+ org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setRanges(jobConf,
+ config.getRanges());
+ if (config.getIterators().size() > 0)
+ InputConfigurator.writeIteratorsToConf(callingClass, jobConf, config.getIterators());
+ if (config.getFetchedColumns().size() > 0)
+ InputConfigurator.fetchColumns(callingClass, jobConf, config.getFetchedColumns());
+ if (config.getSamplerConfiguration() != null)
+ org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setSamplerConfiguration(jobConf,
+ config.getSamplerConfiguration());
+ if (config.getExecutionHints().size() > 0)
+ org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setExecutionHints(jobConf,
+ config.getExecutionHints());
+ org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setAutoAdjustRanges(jobConf,
+ config.shouldAutoAdjustRanges());
+ org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setScanIsolation(jobConf,
+ config.shouldUseIsolatedScanners());
+ org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setLocalIterators(jobConf,
+ config.shouldUseLocalIterators());
+ org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setOfflineTableScan(jobConf,
+ config.isOfflineScan());
+ org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setBatchScan(jobConf,
+ config.shouldBatchScan());
+ } else {
+ InputConfigurator.setInputTableConfigs(callingClass, jobConf, tableConfigMap);
+ }
}
- private Authorizations getUserAuths(Authorizations scanAuths, ClientInfo clientInfo)
+ private Authorizations getUserAuths(ClientInfo clientInfo)
throws AccumuloSecurityException, AccumuloException {
- if (scanAuths != null)
- return scanAuths;
try (AccumuloClient c = Accumulo.newClient().from(clientInfo.getProperties()).build()) {
return c.securityOperations().getUserAuthorizations(clientInfo.getPrincipal());
}
}
- private static class BuilderBooleans {
- boolean autoAdjustRanges = true;
- boolean scanIsolation = false;
- boolean offlineScan = false;
- boolean localIters = false;
- boolean batchScan = false;
- }
}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputTableConfig.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputTableConfig.java
index c90c92f..9d27d6f 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputTableConfig.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputTableConfig.java
@@ -24,15 +24,20 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.ScannerBase;
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
+import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.Pair;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
@@ -43,14 +48,19 @@ import org.apache.hadoop.io.Writable;
*/
public class InputTableConfig implements Writable {
- private List<IteratorSetting> iterators;
- private List<Range> ranges;
- private Collection<IteratorSetting.Column> columns;
+ // store iterators by name to prevent duplicates for addIterator
+ private Map<String,IteratorSetting> iterators = Collections.emptyMap();
+ private List<Range> ranges = Collections.emptyList();
+ private Collection<IteratorSetting.Column> columns = Collections.emptyList();
+
+ private Optional<Authorizations> scanAuths = Optional.empty();
+ private Optional<String> context = Optional.empty();
private boolean autoAdjustRanges = true;
private boolean useLocalIterators = false;
private boolean useIsolatedScanners = false;
private boolean offlineScan = false;
+ private boolean batchScan = false;
private SamplerConfiguration samplerConfig = null;
private Map<String,String> executionHints = Collections.emptyMap();
@@ -83,7 +93,7 @@ public class InputTableConfig implements Writable {
* Returns the ranges to be queried in the configuration
*/
public List<Range> getRanges() {
- return ranges != null ? ranges : new ArrayList<>();
+ return ranges;
}
/**
@@ -107,23 +117,17 @@ public class InputTableConfig implements Writable {
return columns != null ? columns : new HashSet<>();
}
- /**
- * Set iterators on to be used in the query.
- *
- * @param iterators
- * the configurations for the iterators
- * @since 1.6.0
- */
- public InputTableConfig setIterators(List<IteratorSetting> iterators) {
- this.iterators = iterators;
- return this;
+ public void addIterator(IteratorSetting cfg) {
+ if (this.iterators.isEmpty())
+ this.iterators = new LinkedHashMap<>();
+ this.iterators.put(cfg.getName(), cfg);
}
/**
* Returns the iterators to be set on this configuration
*/
public List<IteratorSetting> getIterators() {
- return iterators != null ? iterators : new ArrayList<>();
+ return new LinkedList<>(iterators.values());
}
/**
@@ -217,7 +221,6 @@ public class InputTableConfig implements Writable {
*
* @param offlineScan
* the feature is enabled if true, disabled otherwise
- * @since 1.6.0
*/
public InputTableConfig setOfflineScan(boolean offlineScan) {
this.offlineScan = offlineScan;
@@ -228,7 +231,6 @@ public class InputTableConfig implements Writable {
* Determines whether a configuration has the offline table scan feature enabled.
*
* @return true if the feature is enabled, false otherwise
- * @since 1.6.0
* @see #setOfflineScan(boolean)
*/
public boolean isOfflineScan() {
@@ -243,7 +245,6 @@ public class InputTableConfig implements Writable {
*
* @param useIsolatedScanners
* the feature is enabled if true, disabled otherwise
- * @since 1.6.0
*/
public InputTableConfig setUseIsolatedScanners(boolean useIsolatedScanners) {
this.useIsolatedScanners = useIsolatedScanners;
@@ -254,55 +255,68 @@ public class InputTableConfig implements Writable {
* Determines whether a configuration has isolation enabled.
*
* @return true if the feature is enabled, false otherwise
- * @since 1.6.0
* @see #setUseIsolatedScanners(boolean)
*/
public boolean shouldUseIsolatedScanners() {
return useIsolatedScanners;
}
+ public void setUseBatchScan(boolean value) {
+ this.batchScan = value;
+ }
+
+ public boolean shouldBatchScan() {
+ return batchScan;
+ }
+
/**
* Set the sampler configuration to use when reading from the data.
*
* @see ScannerBase#setSamplerConfiguration(SamplerConfiguration)
* @see InputFormatBase#setSamplerConfiguration(org.apache.hadoop.mapreduce.Job,
* SamplerConfiguration)
- *
- * @since 1.8.0
*/
public void setSamplerConfiguration(SamplerConfiguration samplerConfiguration) {
this.samplerConfig = samplerConfiguration;
}
- /**
- *
- * @since 1.8.0
- */
public SamplerConfiguration getSamplerConfiguration() {
return samplerConfig;
}
/**
* The execution hints to set on created scanners. See {@link ScannerBase#setExecutionHints(Map)}
- *
- * @since 2.0.0
*/
public void setExecutionHints(Map<String,String> executionHints) {
this.executionHints = executionHints;
}
- /**
- * @since 2.0.0
- */
public Map<String,String> getExecutionHints() {
return executionHints;
}
+ public Optional<Authorizations> getScanAuths() {
+ return scanAuths;
+ }
+
+ public InputTableConfig setScanAuths(Authorizations scanAuths) {
+ this.scanAuths = Optional.of(scanAuths);
+ return this;
+ }
+
+ public Optional<String> getContext() {
+ return context;
+ }
+
+ public void setContext(String context) {
+ this.context = Optional.of(context);
+ }
+
@Override
public void write(DataOutput dataOutput) throws IOException {
if (iterators != null) {
dataOutput.writeInt(iterators.size());
- for (IteratorSetting setting : iterators)
+ for (IteratorSetting setting : getIterators())
setting.write(dataOutput);
} else {
dataOutput.writeInt(0);
@@ -340,7 +354,7 @@ public class InputTableConfig implements Writable {
new SamplerConfigurationImpl(samplerConfig).write(dataOutput);
}
- if (executionHints == null || executionHints.size() == 0) {
+ if (executionHints.isEmpty()) {
dataOutput.writeInt(0);
} else {
dataOutput.writeInt(executionHints.size());
@@ -356,9 +370,11 @@ public class InputTableConfig implements Writable {
// load iterators
long iterSize = dataInput.readInt();
if (iterSize > 0)
- iterators = new ArrayList<>();
- for (int i = 0; i < iterSize; i++)
- iterators.add(new IteratorSetting(dataInput));
+ iterators = new LinkedHashMap<>();
+ for (int i = 0; i < iterSize; i++) {
+ IteratorSetting newIter = new IteratorSetting(dataInput);
+ iterators.put(newIter.getName(), newIter);
+ }
// load ranges
long rangeSize = dataInput.readInt();
if (rangeSize > 0)
@@ -419,17 +435,15 @@ public class InputTableConfig implements Writable {
return false;
if (useLocalIterators != that.useLocalIterators)
return false;
- if (columns != null ? !columns.equals(that.columns) : that.columns != null)
+ if (!Objects.equals(columns, that.columns))
return false;
- if (iterators != null ? !iterators.equals(that.iterators) : that.iterators != null)
+ if (!Objects.equals(iterators, that.iterators))
return false;
- if (ranges != null ? !ranges.equals(that.ranges) : that.ranges != null)
+ if (!Objects.equals(ranges, that.ranges))
return false;
- if (executionHints != null ? !executionHints.equals(that.executionHints)
- : that.executionHints != null)
+ if (!Objects.equals(executionHints, that.executionHints))
return false;
- return samplerConfig != null ? samplerConfig.equals(that.samplerConfig)
- : that.samplerConfig == null;
+ return Objects.equals(samplerConfig, that.samplerConfig);
}
@Override
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/InputConfigurator.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/InputConfigurator.java
index f52b427..e1da2b9 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/InputConfigurator.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/InputConfigurator.java
@@ -780,7 +780,7 @@ public class InputConfigurator extends ConfiguratorBase {
InputTableConfig queryConfig = new InputTableConfig();
List<IteratorSetting> itrs = getIterators(implementingClass, conf);
if (itrs != null)
- queryConfig.setIterators(itrs);
+ itrs.forEach(itr -> queryConfig.addIterator(itr));
Set<IteratorSetting.Column> columns = getFetchedColumns(implementingClass, conf);
if (columns != null)
queryConfig.fetchColumns(columns);
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/MultiTableInputFormatIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/MultiTableInputFormatIT.java
new file mode 100644
index 0000000..95244fe
--- /dev/null
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/MultiTableInputFormatIT.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoop.its.mapred;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.hadoop.mapred.AccumuloInputFormat;
+import org.apache.accumulo.hadoopImpl.mapred.RangeInputSplit;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Test;
+
+public class MultiTableInputFormatIT extends AccumuloClusterHarness {
+
+ private static AssertionError e1 = null;
+ private static AssertionError e2 = null;
+
+ private static class MRTester extends Configured implements Tool {
+ private static class TestMapper implements Mapper<Key,Value,Key,Value> {
+ Key key = null;
+ int count = 0;
+
+ @Override
+ public void map(Key k, Value v, OutputCollector<Key,Value> output, Reporter reporter)
+ throws IOException {
+ try {
+ String tableName = ((RangeInputSplit) reporter.getInputSplit()).getTableName();
+ if (key != null)
+ assertEquals(key.getRow().toString(), new String(v.get()));
+ assertEquals(new Text(String.format("%s_%09x", tableName, count + 1)), k.getRow());
+ assertEquals(String.format("%s_%09x", tableName, count), new String(v.get()));
+ } catch (AssertionError e) {
+ e1 = e;
+ }
+ key = new Key(k);
+ count++;
+ }
+
+ @Override
+ public void configure(JobConf job) {}
+
+ @Override
+ public void close() throws IOException {
+ try {
+ assertEquals(100, count);
+ } catch (AssertionError e) {
+ e2 = e;
+ }
+ }
+
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+
+ if (args.length != 2) {
+ throw new IllegalArgumentException(
+ "Usage : " + MRTester.class.getName() + " <table1> <table2>");
+ }
+
+ String table1 = args[0];
+ String table2 = args[1];
+
+ JobConf job = new JobConf(getConf());
+ job.setJarByClass(this.getClass());
+
+ job.setInputFormat(AccumuloInputFormat.class);
+
+ AccumuloInputFormat.configure().clientProperties(getClientInfo().getProperties())
+ .table(table1).table(table2).store(job);
+
+ job.setMapperClass(TestMapper.class);
+ job.setMapOutputKeyClass(Key.class);
+ job.setMapOutputValueClass(Value.class);
+ job.setOutputFormat(NullOutputFormat.class);
+
+ job.setNumReduceTasks(0);
+
+ return JobClient.runJob(job).isSuccessful() ? 0 : 1;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Configuration conf = new Configuration();
+ conf.set("mapreduce.framework.name", "local");
+ conf.set("mapreduce.cluster.local.dir",
+ new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+ assertEquals(0, ToolRunner.run(conf, new MRTester(), args));
+ }
+ }
+
+ @Test
+ public void testMap() throws Exception {
+ String[] tableNames = getUniqueNames(2);
+ String table1 = tableNames[0];
+ String table2 = tableNames[1];
+ try (AccumuloClient c = createAccumuloClient()) {
+ c.tableOperations().create(table1);
+ c.tableOperations().create(table2);
+ BatchWriter bw = c.createBatchWriter(table1, new BatchWriterConfig());
+ BatchWriter bw2 = c.createBatchWriter(table2, new BatchWriterConfig());
+ for (int i = 0; i < 100; i++) {
+ Mutation t1m = new Mutation(new Text(String.format("%s_%09x", table1, i + 1)));
+ t1m.put(new Text(), new Text(), new Value(String.format("%s_%09x", table1, i).getBytes()));
+ bw.addMutation(t1m);
+ Mutation t2m = new Mutation(new Text(String.format("%s_%09x", table2, i + 1)));
+ t2m.put(new Text(), new Text(), new Value(String.format("%s_%09x", table2, i).getBytes()));
+ bw2.addMutation(t2m);
+ }
+ bw.close();
+ bw2.close();
+
+ MRTester.main(new String[] {table1, table2});
+ assertNull(e1);
+ assertNull(e2);
+ }
+ }
+
+}
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/MultiTableInputFormatIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/MultiTableInputFormatIT.java
new file mode 100644
index 0000000..07349dd
--- /dev/null
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/MultiTableInputFormatIT.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoop.its.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.hadoopImpl.mapreduce.RangeInputSplit;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Test;
+
+public class MultiTableInputFormatIT extends AccumuloClusterHarness {
+
+ private static AssertionError e1 = null;
+ private static AssertionError e2 = null;
+
+ private static class MRTester extends Configured implements Tool {
+
+ private static class TestMapper extends Mapper<Key,Value,Key,Value> {
+ Key key = null;
+ int count = 0;
+
+ @Override
+ protected void map(Key k, Value v, Context context) throws IOException, InterruptedException {
+ try {
+ String tableName = ((RangeInputSplit) context.getInputSplit()).getTableName();
+ if (key != null)
+ assertEquals(key.getRow().toString(), new String(v.get()));
+ assertEquals(new Text(String.format("%s_%09x", tableName, count + 1)), k.getRow());
+ assertEquals(String.format("%s_%09x", tableName, count), new String(v.get()));
+ } catch (AssertionError e) {
+ e1 = e;
+ }
+ key = new Key(k);
+ count++;
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ try {
+ assertEquals(100, count);
+ } catch (AssertionError e) {
+ e2 = e;
+ }
+ }
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+
+ if (args.length != 2) {
+ throw new IllegalArgumentException(
+ "Usage : " + MRTester.class.getName() + " <table1> <table2>");
+ }
+
+ String table1 = args[0];
+ String table2 = args[1];
+
+ Job job = Job.getInstance(getConf(),
+ this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
+ job.setJarByClass(this.getClass());
+
+ job.setInputFormatClass(AccumuloInputFormat.class);
+
+ AccumuloInputFormat.configure().clientProperties(getClientInfo().getProperties())
+ .table(table1).table(table2).store(job);
+
+ job.setMapperClass(TestMapper.class);
+ job.setMapOutputKeyClass(Key.class);
+ job.setMapOutputValueClass(Value.class);
+ job.setOutputFormatClass(NullOutputFormat.class);
+
+ job.setNumReduceTasks(0);
+
+ job.waitForCompletion(true);
+
+ return job.isSuccessful() ? 0 : 1;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Configuration conf = new Configuration();
+ conf.set("mapreduce.framework.name", "local");
+ conf.set("mapreduce.cluster.local.dir",
+ new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+ assertEquals(0, ToolRunner.run(conf, new MRTester(), args));
+ }
+ }
+
+ /**
+ * Generate incrementing counts and attach table name to the key/value so that order and
+ * multi-table data can be verified.
+ */
+ @Test
+ public void testMap() throws Exception {
+ String[] tableNames = getUniqueNames(2);
+ String table1 = tableNames[0];
+ String table2 = tableNames[1];
+ try (AccumuloClient c = createAccumuloClient()) {
+ c.tableOperations().create(table1);
+ c.tableOperations().create(table2);
+ BatchWriter bw = c.createBatchWriter(table1, new BatchWriterConfig());
+ BatchWriter bw2 = c.createBatchWriter(table2, new BatchWriterConfig());
+ for (int i = 0; i < 100; i++) {
+ Mutation t1m = new Mutation(new Text(String.format("%s_%09x", table1, i + 1)));
+ t1m.put(new Text(), new Text(), new Value(String.format("%s_%09x", table1, i).getBytes()));
+ bw.addMutation(t1m);
+ Mutation t2m = new Mutation(new Text(String.format("%s_%09x", table2, i + 1)));
+ t2m.put(new Text(), new Text(), new Value(String.format("%s_%09x", table2, i).getBytes()));
+ bw2.addMutation(t2m);
+ }
+ bw.close();
+ bw2.close();
+
+ MRTester.main(new String[] {table1, table2});
+ assertNull(e1);
+ assertNull(e2);
+ }
+ }
+
+}
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormatTest.java
index 2401477..65e1ddf 100644
--- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormatTest.java
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormatTest.java
@@ -40,6 +40,7 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import org.junit.rules.TestName;
public class AccumuloInputFormatTest {
@@ -49,6 +50,8 @@ public class AccumuloInputFormatTest {
@Rule
public TestName test = new TestName();
+ @Rule
+ public ExpectedException exception = ExpectedException.none();
@Before
public void createJob() {
@@ -61,6 +64,14 @@ public class AccumuloInputFormatTest {
.setupClientProperties();
}
+ @Test
+ public void testMissingTable() throws Exception {
+ Properties clientProps = org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormatTest
+ .setupClientProperties();
+ exception.expect(IllegalArgumentException.class);
+ AccumuloInputFormat.configure().clientProperties(clientProps).store(new JobConf());
+ }
+
/**
* Check that the iterator configuration is getting stored in the Job conf correctly.
*/
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/MultiTableInputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/MultiTableInputFormatTest.java
new file mode 100644
index 0000000..775596a
--- /dev/null
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/MultiTableInputFormatTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoop.mapred;
+
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.IteratorSetting.Column;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.hadoop.mapreduce.InputFormatBuilder;
+import org.apache.accumulo.hadoopImpl.mapreduce.InputTableConfig;
+import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+public class MultiTableInputFormatTest {
+ public static final Class<AccumuloInputFormat> CLASS = AccumuloInputFormat.class;
+
+ @Rule
+ public TestName testName = new TestName();
+
+ /**
+ * Verify {@link InputTableConfig} objects get correctly serialized in the JobContext.
+ */
+ @Test
+ public void testStoreTables() throws Exception {
+ String table1Name = testName.getMethodName() + "1";
+ String table2Name = testName.getMethodName() + "2";
+ JobConf job = new JobConf();
+ Properties clientProps = org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormatTest
+ .setupClientProperties();
+ List<Range> ranges = singletonList(new Range("a", "b"));
+ Set<IteratorSetting.Column> cols = singleton(
+ new IteratorSetting.Column(new Text("CF1"), new Text("CQ1")));
+ IteratorSetting iter1 = new IteratorSetting(50, "iter1", "iterclass1");
+ IteratorSetting iter2 = new IteratorSetting(60, "iter2", "iterclass2");
+ List<IteratorSetting> allIters = new ArrayList<>();
+ allIters.add(iter1);
+ allIters.add(iter2);
+
+ // if auths are not set client will try to get from server, we dont want that here
+ Authorizations auths = Authorizations.EMPTY;
+
+ // @formatter:off
+ AccumuloInputFormat.configure().clientProperties(clientProps)
+ .table(table1Name).auths(auths).ranges(ranges).fetchColumns(cols).addIterator(iter1)
+ .addIterator(iter2).localIterators(true).offlineScan(true) // end table 1
+ .table(table2Name).auths(auths).ranges(ranges).fetchColumns(cols).addIterator(iter2) // end
+ .store(job);
+ // @formatter:on
+
+ InputTableConfig table1 = new InputTableConfig();
+ table1.setScanAuths(auths).setRanges(ranges).fetchColumns(cols).setUseLocalIterators(true)
+ .setOfflineScan(true);
+ allIters.forEach(itr -> table1.addIterator(itr));
+ InputTableConfig table2 = new InputTableConfig();
+ table2.setScanAuths(auths).setRanges(ranges).fetchColumns(cols).addIterator(iter2);
+
+ assertEquals(table1, InputConfigurator.getInputTableConfig(CLASS, job, table1Name));
+ assertEquals(table2, InputConfigurator.getInputTableConfig(CLASS, job, table2Name));
+ }
+
+ @Test
+ public void testManyTables() throws Exception {
+ JobConf job = new JobConf();
+ Properties clientProps = org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormatTest
+ .setupClientProperties();
+
+ // if auths are not set client will try to get from server, we dont want that here
+ Authorizations auths = Authorizations.EMPTY;
+
+ // set the client properties once then loop over tables
+ InputFormatBuilder.TableParams<JobConf> opts = AccumuloInputFormat.configure()
+ .clientProperties(clientProps);
+ for (int i = 0; i < 10_000; i++) {
+ List<Range> ranges = singletonList(new Range("a" + i, "b" + i));
+ Set<Column> cols = singleton(new Column(new Text("CF" + i), new Text("CQ" + i)));
+ IteratorSetting iter = new IteratorSetting(50, "iter" + i, "iterclass" + i);
+ opts.table("table" + i).auths(auths).ranges(ranges).fetchColumns(cols).addIterator(iter);
+ }
+ opts.store(job);
+
+ // verify
+ Map<String,InputTableConfig> configs = InputConfigurator.getInputTableConfigs(CLASS, job);
+ assertEquals(10_000, configs.size());
+
+ // create objects to test against
+ for (int i = 0; i < 10_000; i++) {
+ InputTableConfig t = new InputTableConfig();
+ List<Range> ranges = singletonList(new Range("a" + i, "b" + i));
+ Set<Column> cols = singleton(new Column(new Text("CF" + i), new Text("CQ" + i)));
+ IteratorSetting iter = new IteratorSetting(50, "iter" + i, "iterclass" + i);
+ t.setScanAuths(auths).setRanges(ranges).fetchColumns(cols).addIterator(iter);
+ assertEquals(t, configs.get("table" + i));
+ }
+ }
+}
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormatTest.java
index 227eb84..ac22733 100644
--- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormatTest.java
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormatTest.java
@@ -39,9 +39,14 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
public class AccumuloInputFormatTest {
+ @Rule
+ public ExpectedException exception = ExpectedException.none();
+
static Properties clientProperties;
@BeforeClass
@@ -59,6 +64,14 @@ public class AccumuloInputFormatTest {
return cp;
}
+ @Test
+ public void testMissingTable() throws Exception {
+ Properties clientProps = org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormatTest
+ .setupClientProperties();
+ exception.expect(IllegalArgumentException.class);
+ AccumuloInputFormat.configure().clientProperties(clientProps).store(Job.getInstance());
+ }
+
/**
* Check that the iterator configuration is getting stored in the Job conf correctly.
*/
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/MultiTableInputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/MultiTableInputFormatTest.java
new file mode 100644
index 0000000..8cd353b
--- /dev/null
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/MultiTableInputFormatTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.hadoop.mapreduce;
+
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.IteratorSetting.Column;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.hadoopImpl.mapreduce.InputTableConfig;
+import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+public class MultiTableInputFormatTest {
+ public static final Class<AccumuloInputFormat> CLASS = AccumuloInputFormat.class;
+
+ @Rule
+ public TestName testName = new TestName();
+
+ /**
+ * Verify {@link InputTableConfig} objects get correctly serialized in the JobContext.
+ */
+ @Test
+ public void testStoreTables() throws Exception {
+ String table1Name = testName.getMethodName() + "1";
+ String table2Name = testName.getMethodName() + "2";
+ Job job = Job.getInstance();
+ Properties clientProps = org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormatTest
+ .setupClientProperties();
+ List<Range> ranges = singletonList(new Range("a", "b"));
+ Set<IteratorSetting.Column> cols = singleton(
+ new IteratorSetting.Column(new Text("CF1"), new Text("CQ1")));
+ IteratorSetting iter1 = new IteratorSetting(50, "iter1", "iterclass1");
+ IteratorSetting iter2 = new IteratorSetting(60, "iter2", "iterclass2");
+ List<IteratorSetting> allIters = new ArrayList<>();
+ allIters.add(iter1);
+ allIters.add(iter2);
+
+ // if auths are not set client will try to get from server, we dont want that here
+ Authorizations auths = Authorizations.EMPTY;
+
+ // @formatter:off
+ AccumuloInputFormat.configure().clientProperties(clientProps)
+ .table(table1Name).auths(auths).ranges(ranges).fetchColumns(cols).addIterator(iter1)
+ .addIterator(iter2).localIterators(true).offlineScan(true) // end table 1
+ .table(table2Name).auths(auths).ranges(ranges).fetchColumns(cols).addIterator(iter2) // end
+ .store(job);
+ // @formatter:on
+
+ InputTableConfig table1 = new InputTableConfig();
+ table1.setScanAuths(auths).setRanges(ranges).fetchColumns(cols).setUseLocalIterators(true)
+ .setOfflineScan(true);
+ allIters.forEach(itr -> table1.addIterator(itr));
+ InputTableConfig table2 = new InputTableConfig();
+ table2.setScanAuths(auths).setRanges(ranges).fetchColumns(cols).addIterator(iter2);
+
+ Configuration jc = job.getConfiguration();
+ assertEquals(table1, InputConfigurator.getInputTableConfig(CLASS, jc, table1Name));
+ assertEquals(table2, InputConfigurator.getInputTableConfig(CLASS, jc, table2Name));
+ }
+
+ @Test
+ public void testManyTables() throws Exception {
+ Job job = Job.getInstance();
+ Properties clientProps = org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormatTest
+ .setupClientProperties();
+
+ // if auths are not set client will try to get from server, we dont want that here
+ Authorizations auths = Authorizations.EMPTY;
+
+ // set the client properties once then loop over tables
+ InputFormatBuilder.TableParams<Job> opts = AccumuloInputFormat.configure()
+ .clientProperties(clientProps);
+ for (int i = 0; i < 10_000; i++) {
+ List<Range> ranges = singletonList(new Range("a" + i, "b" + i));
+ Set<Column> cols = singleton(new Column(new Text("CF" + i), new Text("CQ" + i)));
+ IteratorSetting iter = new IteratorSetting(50, "iter" + i, "iterclass" + i);
+ opts.table("table" + i).auths(auths).ranges(ranges).fetchColumns(cols).addIterator(iter);
+ }
+ opts.store(job);
+
+ // verify
+ Map<String,InputTableConfig> configs = InputConfigurator.getInputTableConfigs(CLASS,
+ job.getConfiguration());
+ assertEquals(10_000, configs.size());
+
+ // create objects to test against
+ for (int i = 0; i < 10_000; i++) {
+ InputTableConfig t = new InputTableConfig();
+ List<Range> ranges = singletonList(new Range("a" + i, "b" + i));
+ Set<Column> cols = singleton(new Column(new Text("CF" + i), new Text("CQ" + i)));
+ IteratorSetting iter = new IteratorSetting(50, "iter" + i, "iterclass" + i);
+ t.setScanAuths(auths).setRanges(ranges).fetchColumns(cols).addIterator(iter);
+ assertEquals(t, configs.get("table" + i));
+ }
+ }
+}
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoopImpl/mapreduce/InputTableConfigTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoopImpl/mapreduce/InputTableConfigTest.java
index 0d25fee..74ab698 100644
--- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoopImpl/mapreduce/InputTableConfigTest.java
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoopImpl/mapreduce/InputTableConfigTest.java
@@ -97,7 +97,7 @@ public class InputTableConfigTest {
List<IteratorSetting> settings = new ArrayList<>();
settings.add(new IteratorSetting(50, "iter", "iterclass"));
settings.add(new IteratorSetting(55, "iter2", "iterclass2"));
- tableQueryConfig.setIterators(settings);
+ settings.forEach(itr -> tableQueryConfig.addIterator(itr));
byte[] serialized = serialize(tableQueryConfig);
InputTableConfig actualConfig = deserialize(serialized);
assertEquals(actualConfig.getIterators(), settings);