You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2013/12/12 01:27:07 UTC
[2/4] git commit: ACCUMULO-1992 Remove CachedConfiguration from
examples
ACCUMULO-1992 Remove CachedConfiguration from examples
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/7b7521dd
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/7b7521dd
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/7b7521dd
Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 7b7521dd92b81f4c91eae2415f6d835944b15355
Parents: 5f90d0b
Author: Christopher Tubbs <ct...@apache.org>
Authored: Mon Dec 9 13:22:21 2013 -0500
Committer: Christopher Tubbs <ct...@apache.org>
Committed: Wed Dec 11 16:59:48 2013 -0500
----------------------------------------------------------------------
.../simple/filedata/CharacterHistogram.java | 28 +++---
.../examples/simple/mapreduce/NGramIngest.java | 36 ++++----
.../examples/simple/mapreduce/RegexExample.java | 24 ++---
.../examples/simple/mapreduce/RowHash.java | 22 ++---
.../examples/simple/mapreduce/TableToFile.java | 28 +++---
.../simple/mapreduce/TeraSortIngest.java | 97 ++++++++++----------
.../simple/mapreduce/TokenFileWordCount.java | 33 +++----
.../simple/mapreduce/UniqueColumns.java | 44 ++++-----
.../examples/simple/mapreduce/WordCount.java | 29 +++---
.../mapreduce/bulk/BulkIngestExample.java | 51 +++++-----
.../simple/mapreduce/bulk/GenerateTestData.java | 20 ++--
.../simple/filedata/ChunkInputFormatTest.java | 58 ++++++------
12 files changed, 235 insertions(+), 235 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7b7521dd/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/CharacterHistogram.java
----------------------------------------------------------------------
diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/CharacterHistogram.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/CharacterHistogram.java
index 11eda3e..d0662b6 100644
--- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/CharacterHistogram.java
+++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/CharacterHistogram.java
@@ -29,8 +29,8 @@ import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.user.SummingArrayCombiner;
import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.examples.simple.mapreduce.JobUtil;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
@@ -46,14 +46,15 @@ import com.beust.jcommander.Parameter;
*/
public class CharacterHistogram extends Configured implements Tool {
public static final String VIS = "vis";
-
+
public static void main(String[] args) throws Exception {
- System.exit(ToolRunner.run(CachedConfiguration.getInstance(), new CharacterHistogram(), args));
+ System.exit(ToolRunner.run(new Configuration(), new CharacterHistogram(), args));
}
-
+
public static class HistMapper extends Mapper<List<Entry<Key,Value>>,InputStream,Text,Mutation> {
private ColumnVisibility cv;
-
+
+ @Override
public void map(List<Entry<Key,Value>> k, InputStream v, Context context) throws IOException, InterruptedException {
Long[] hist = new Long[256];
for (int i = 0; i < hist.length; i++)
@@ -68,19 +69,18 @@ public class CharacterHistogram extends Configured implements Tool {
m.put("info", "hist", cv, new Value(SummingArrayCombiner.STRING_ARRAY_ENCODER.encode(Arrays.asList(hist))));
context.write(new Text(), m);
}
-
+
@Override
protected void setup(Context context) throws IOException, InterruptedException {
cv = new ColumnVisibility(context.getConfiguration().get(VIS, ""));
}
}
-
+
static class Opts extends ClientOnRequiredTable {
- @Parameter(names="--vis")
+ @Parameter(names = "--vis")
String visibilities = "";
}
-
-
+
@Override
public int run(String[] args) throws Exception {
Job job = JobUtil.getJob(getConf());
@@ -93,15 +93,15 @@ public class CharacterHistogram extends Configured implements Tool {
job.setInputFormatClass(ChunkInputFormat.class);
opts.setAccumuloConfigs(job);
job.getConfiguration().set(VIS, opts.visibilities.toString());
-
+
job.setMapperClass(HistMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Mutation.class);
-
+
job.setNumReduceTasks(0);
-
+
job.setOutputFormatClass(AccumuloOutputFormat.class);
-
+
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7b7521dd/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/NGramIngest.java
----------------------------------------------------------------------
diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/NGramIngest.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/NGramIngest.java
index 2f9b01a..93b589d 100644
--- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/NGramIngest.java
+++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/NGramIngest.java
@@ -24,7 +24,7 @@ import org.apache.accumulo.core.cli.ClientOnRequiredTable;
import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
@@ -39,19 +39,18 @@ import org.apache.log4j.Logger;
import com.beust.jcommander.Parameter;
/**
- * Map job to ingest n-gram files from
- * http://storage.googleapis.com/books/ngrams/books/datasetsv2.html
+ * Map job to ingest n-gram files from http://storage.googleapis.com/books/ngrams/books/datasetsv2.html
*/
-public class NGramIngest extends Configured implements Tool {
-
+public class NGramIngest extends Configured implements Tool {
+
private static final Logger log = Logger.getLogger(NGramIngest.class);
-
-
+
static class Opts extends ClientOnRequiredTable {
- @Parameter(names = "--input", required=true)
+ @Parameter(names = "--input", required = true)
String inputDirectory;
}
- static class NGramMapper extends Mapper<LongWritable, Text, Text, Mutation> {
+
+ static class NGramMapper extends Mapper<LongWritable,Text,Text,Mutation> {
@Override
protected void map(LongWritable location, Text value, Context context) throws IOException, InterruptedException {
@@ -75,19 +74,18 @@ public class NGramIngest extends Configured implements Tool {
Job job = JobUtil.getJob(getConf());
job.setJobName(getClass().getSimpleName());
job.setJarByClass(getClass());
-
+
opts.setAccumuloConfigs(job);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(AccumuloOutputFormat.class);
-
+
job.setMapperClass(NGramMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Mutation.class);
-
+
job.setNumReduceTasks(0);
job.setSpeculativeExecution(false);
-
-
+
if (!opts.getConnector().tableOperations().exists(opts.tableName)) {
log.info("Creating table " + opts.tableName);
opts.getConnector().tableOperations().create(opts.tableName);
@@ -95,23 +93,23 @@ public class NGramIngest extends Configured implements Tool {
String numbers[] = "1 2 3 4 5 6 7 8 9".split("\\s");
String lower[] = "a b c d e f g h i j k l m n o p q r s t u v w x y z".split("\\s");
String upper[] = "A B C D E F G H I J K L M N O P Q R S T U V W X Y Z".split("\\s");
- for (String[] array : new String[][]{numbers, lower, upper}) {
+ for (String[] array : new String[][] {numbers, lower, upper}) {
for (String s : array) {
splits.add(new Text(s));
}
}
opts.getConnector().tableOperations().addSplits(opts.tableName, splits);
}
-
+
TextInputFormat.addInputPath(job, new Path(opts.inputDirectory));
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
}
-
+
public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(CachedConfiguration.getInstance(), new NGramIngest(), args);
+ int res = ToolRunner.run(new Configuration(), new NGramIngest(), args);
if (res != 0)
System.exit(res);
}
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7b7521dd/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/RegexExample.java
----------------------------------------------------------------------
diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/RegexExample.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/RegexExample.java
index 9acc694..47e5879 100644
--- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/RegexExample.java
+++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/RegexExample.java
@@ -24,7 +24,7 @@ import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.user.RegExFilter;
-import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
@@ -42,7 +42,7 @@ public class RegexExample extends Configured implements Tool {
context.write(row, data);
}
}
-
+
static class Opts extends ClientOnRequiredTable {
@Parameter(names = "--rowRegex")
String rowRegex;
@@ -55,7 +55,7 @@ public class RegexExample extends Configured implements Tool {
@Parameter(names = "--output", required = true)
String destination;
}
-
+
@Override
public int run(String[] args) throws Exception {
Opts opts = new Opts();
@@ -64,34 +64,34 @@ public class RegexExample extends Configured implements Tool {
Job job = JobUtil.getJob(getConf());
job.setJobName(getClass().getSimpleName());
job.setJarByClass(getClass());
-
+
job.setInputFormatClass(AccumuloInputFormat.class);
opts.setAccumuloConfigs(job);
-
+
IteratorSetting regex = new IteratorSetting(50, "regex", RegExFilter.class);
RegExFilter.setRegexs(regex, opts.rowRegex, opts.columnFamilyRegex, opts.columnQualifierRegex, opts.valueRegex, false);
AccumuloInputFormat.addIterator(job, regex);
-
+
job.setMapperClass(RegexMapper.class);
job.setMapOutputKeyClass(Key.class);
job.setMapOutputValueClass(Value.class);
-
+
job.setNumReduceTasks(0);
-
+
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path(opts.destination));
-
+
System.out.println("setRowRegex: " + opts.rowRegex);
System.out.println("setColumnFamilyRegex: " + opts.columnFamilyRegex);
System.out.println("setColumnQualifierRegex: " + opts.columnQualifierRegex);
System.out.println("setValueRegex: " + opts.valueRegex);
-
+
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
}
-
+
public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(CachedConfiguration.getInstance(), new RegexExample(), args);
+ int res = ToolRunner.run(new Configuration(), new RegexExample(), args);
if (res != 0)
System.exit(res);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7b7521dd/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/RowHash.java
----------------------------------------------------------------------
diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/RowHash.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/RowHash.java
index 2ca3587..1fa9b8f 100644
--- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/RowHash.java
+++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/RowHash.java
@@ -25,9 +25,9 @@ import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.util.Pair;
import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.io.Text;
@@ -50,16 +50,16 @@ public class RowHash extends Configured implements Tool {
context.write(null, m);
context.progress();
}
-
+
@Override
public void setup(Context job) {}
}
-
+
private static class Opts extends ClientOnRequiredTable {
@Parameter(names = "--column", required = true)
String column = null;
}
-
+
@Override
public int run(String[] args) throws Exception {
Job job = JobUtil.getJob(getConf());
@@ -69,27 +69,27 @@ public class RowHash extends Configured implements Tool {
opts.parseArgs(RowHash.class.getName(), args);
job.setInputFormatClass(AccumuloInputFormat.class);
opts.setAccumuloConfigs(job);
-
+
String col = opts.column;
int idx = col.indexOf(":");
Text cf = new Text(idx < 0 ? col : col.substring(0, idx));
Text cq = idx < 0 ? null : new Text(col.substring(idx + 1));
if (cf.getLength() > 0)
AccumuloInputFormat.fetchColumns(job, Collections.singleton(new Pair<Text,Text>(cf, cq)));
-
+
job.setMapperClass(HashDataMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Mutation.class);
-
+
job.setNumReduceTasks(0);
-
+
job.setOutputFormatClass(AccumuloOutputFormat.class);
-
+
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
}
-
+
public static void main(String[] args) throws Exception {
- ToolRunner.run(CachedConfiguration.getInstance(), new RowHash(), args);
+ ToolRunner.run(new Configuration(), new RowHash(), args);
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7b7521dd/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TableToFile.java
----------------------------------------------------------------------
diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TableToFile.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TableToFile.java
index 8bdc195..3a211e2 100644
--- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TableToFile.java
+++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TableToFile.java
@@ -25,9 +25,9 @@ import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.format.DefaultFormatter;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
@@ -45,14 +45,14 @@ import com.beust.jcommander.Parameter;
* <tablename> <column> <hdfs-output-path>
*/
public class TableToFile extends Configured implements Tool {
-
+
static class Opts extends ClientOnRequiredTable {
@Parameter(names = "--output", description = "output directory", required = true)
String output;
@Parameter(names = "--columns", description = "columns to extract, in cf:cq{,cf:cq,...} form")
String columns = "";
}
-
+
/**
* The Mapper class that given a row number, will generate the appropriate output line.
*/
@@ -66,12 +66,12 @@ public class TableToFile extends Configured implements Tool {
public Key getKey() {
return r;
}
-
+
@Override
public Value getValue() {
return v;
}
-
+
@Override
public Value setValue(Value value) {
return null;
@@ -81,7 +81,7 @@ public class TableToFile extends Configured implements Tool {
context.setStatus("Outputed Value");
}
}
-
+
@Override
public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException, AccumuloSecurityException {
Job job = JobUtil.getJob(getConf());
@@ -89,10 +89,10 @@ public class TableToFile extends Configured implements Tool {
job.setJarByClass(this.getClass());
Opts opts = new Opts();
opts.parseArgs(getClass().getName(), args);
-
+
job.setInputFormatClass(AccumuloInputFormat.class);
opts.setAccumuloConfigs(job);
-
+
HashSet<Pair<Text,Text>> columnsToFetch = new HashSet<Pair<Text,Text>>();
for (String col : opts.columns.split(",")) {
int idx = col.indexOf(":");
@@ -103,20 +103,20 @@ public class TableToFile extends Configured implements Tool {
}
if (!columnsToFetch.isEmpty())
AccumuloInputFormat.fetchColumns(job, columnsToFetch);
-
+
job.setMapperClass(TTFMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
-
+
job.setNumReduceTasks(0);
-
+
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path(opts.output));
-
+
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
}
-
+
/**
*
* @param args
@@ -124,6 +124,6 @@ public class TableToFile extends Configured implements Tool {
* @throws Exception
*/
public static void main(String[] args) throws Exception {
- ToolRunner.run(CachedConfiguration.getInstance(), new TableToFile(), args);
+ ToolRunner.run(new Configuration(), new TableToFile(), args);
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7b7521dd/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java
----------------------------------------------------------------------
diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java
index dd2fea4..f9f2d39 100644
--- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java
+++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java
@@ -30,7 +30,6 @@ import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.LongWritable;
@@ -77,37 +76,37 @@ public class TeraSortIngest extends Configured implements Tool {
static class RangeInputSplit extends InputSplit implements Writable {
long firstRow;
long rowCount;
-
+
public RangeInputSplit() {}
-
+
public RangeInputSplit(long offset, long length) {
firstRow = offset;
rowCount = length;
}
-
+
@Override
public long getLength() throws IOException {
return 0;
}
-
+
@Override
public String[] getLocations() throws IOException {
return new String[] {};
}
-
+
@Override
public void readFields(DataInput in) throws IOException {
firstRow = WritableUtils.readVLong(in);
rowCount = WritableUtils.readVLong(in);
}
-
+
@Override
public void write(DataOutput out) throws IOException {
WritableUtils.writeVLong(out, firstRow);
WritableUtils.writeVLong(out, rowCount);
}
}
-
+
/**
* A record reader that will generate a range of numbers.
*/
@@ -115,36 +114,36 @@ public class TeraSortIngest extends Configured implements Tool {
long startRow;
long finishedRows;
long totalRows;
-
+
LongWritable currentKey;
-
+
public RangeRecordReader(RangeInputSplit split) {
startRow = split.firstRow;
finishedRows = 0;
totalRows = split.rowCount;
}
-
+
@Override
public void close() throws IOException {}
-
+
@Override
public float getProgress() throws IOException {
return finishedRows / (float) totalRows;
}
-
+
@Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
return new LongWritable(startRow + finishedRows);
}
-
+
@Override
public NullWritable getCurrentValue() throws IOException, InterruptedException {
return NullWritable.get();
}
-
+
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {}
-
+
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (finishedRows < totalRows) {
@@ -154,13 +153,13 @@ public class TeraSortIngest extends Configured implements Tool {
return false;
}
}
-
+
@Override
public RecordReader<LongWritable,NullWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {
// reporter.setStatus("Creating record reader");
return new RangeRecordReader((RangeInputSplit) split);
}
-
+
/**
* Create the desired number of splits, dividing the number of rows between the mappers.
*/
@@ -180,12 +179,12 @@ public class TeraSortIngest extends Configured implements Tool {
System.out.println("Done Generating.");
return splits;
}
-
+
}
-
+
private static String NUMSPLITS = "terasort.overridesplits";
private static String NUMROWS = "terasort.numrows";
-
+
static class RandomGenerator {
private long seed = 0;
private static final long mask32 = (1l << 32) - 1;
@@ -199,7 +198,7 @@ public class TeraSortIngest extends Configured implements Tool {
private static final long[] seeds = new long[] {0L, 4160749568L, 4026531840L, 3892314112L, 3758096384L, 3623878656L, 3489660928L, 3355443200L, 3221225472L,
3087007744L, 2952790016L, 2818572288L, 2684354560L, 2550136832L, 2415919104L, 2281701376L, 2147483648L, 2013265920L, 1879048192L, 1744830464L,
1610612736L, 1476395008L, 1342177280L, 1207959552L, 1073741824L, 939524096L, 805306368L, 671088640L, 536870912L, 402653184L, 268435456L, 134217728L,};
-
+
/**
* Start the random number generator on the given iteration.
*
@@ -213,17 +212,17 @@ public class TeraSortIngest extends Configured implements Tool {
next();
}
}
-
+
RandomGenerator() {
this(0);
}
-
+
long next() {
seed = (seed * 3141592621l + 663896637) & mask32;
return seed;
}
}
-
+
/**
* The Mapper class that given a row number, will generate the appropriate output line.
*/
@@ -233,7 +232,7 @@ public class TeraSortIngest extends Configured implements Tool {
private int maxkeylength = 0;
private int minvaluelength = 0;
private int maxvaluelength = 0;
-
+
private Text key = new Text();
private Text value = new Text();
private RandomGenerator rand;
@@ -248,18 +247,18 @@ public class TeraSortIngest extends Configured implements Tool {
}
}
}
-
+
/**
* Add a random key to the text
*/
private Random random = new Random();
-
+
private void addKey() {
int range = random.nextInt(maxkeylength - minkeylength + 1);
int keylen = range + minkeylength;
int keyceil = keylen + (4 - (keylen % 4));
keyBytes = new byte[keyceil];
-
+
long temp = 0;
for (int i = 0; i < keyceil / 4; i++) {
temp = rand.next() / 52;
@@ -273,7 +272,7 @@ public class TeraSortIngest extends Configured implements Tool {
}
key.set(keyBytes, 0, keylen);
}
-
+
/**
* Add the rowid to the row.
*
@@ -289,7 +288,7 @@ public class TeraSortIngest extends Configured implements Tool {
paddedRowIdString.append(rowid, 0, Math.min(rowid.length, 10));
return paddedRowIdString;
}
-
+
/**
* Add the required filler bytes. Each row consists of 7 blocks of 10 characters and 1 block of 8 characters.
*
@@ -298,22 +297,22 @@ public class TeraSortIngest extends Configured implements Tool {
*/
private void addFiller(long rowId) {
int base = (int) ((rowId * 8) % 26);
-
+
// Get Random var
Random random = new Random(rand.seed);
-
+
int range = random.nextInt(maxvaluelength - minvaluelength + 1);
int valuelen = range + minvaluelength;
-
+
while (valuelen > 10) {
value.append(filler[(base + valuelen) % 26], 0, 10);
valuelen -= 10;
}
-
+
if (valuelen > 0)
value.append(filler[(base + valuelen) % 26], 0, valuelen);
}
-
+
@Override
public void map(LongWritable row, NullWritable ignored, Context context) throws IOException, InterruptedException {
context.setStatus("Entering");
@@ -326,18 +325,18 @@ public class TeraSortIngest extends Configured implements Tool {
value.clear();
// addRowId(rowId);
addFiller(rowId);
-
+
// New
Mutation m = new Mutation(key);
m.put(new Text("c"), // column family
getRowIdString(rowId), // column qual
new Value(value.toString().getBytes())); // data
-
+
context.setStatus("About to add to accumulo");
context.write(table, m);
context.setStatus("Added to accumulo " + key.toString());
}
-
+
@Override
public void setup(Context job) {
minkeylength = job.getConfiguration().getInt("cloudgen.minkeylength", 0);
@@ -347,11 +346,11 @@ public class TeraSortIngest extends Configured implements Tool {
table = new Text(job.getConfiguration().get("cloudgen.tablename"));
}
}
-
+
public static void main(String[] args) throws Exception {
- ToolRunner.run(CachedConfiguration.getInstance(), new TeraSortIngest(), args);
+ ToolRunner.run(new Configuration(), new TeraSortIngest(), args);
}
-
+
static class Opts extends ClientOnRequiredTable {
@Parameter(names = "--count", description = "number of rows to ingest", required = true)
long numRows;
@@ -366,7 +365,7 @@ public class TeraSortIngest extends Configured implements Tool {
@Parameter(names = "--splits", description = "number of splits to create in the table")
int splits = 0;
}
-
+
@Override
public int run(String[] args) throws Exception {
Job job = JobUtil.getJob(getConf());
@@ -374,19 +373,19 @@ public class TeraSortIngest extends Configured implements Tool {
job.setJarByClass(this.getClass());
Opts opts = new Opts();
opts.parseArgs(TeraSortIngest.class.getName(), args);
-
+
job.setInputFormatClass(RangeInputFormat.class);
job.setMapperClass(SortGenMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Mutation.class);
-
+
job.setNumReduceTasks(0);
-
+
job.setOutputFormatClass(AccumuloOutputFormat.class);
opts.setAccumuloConfigs(job);
BatchWriterConfig bwConfig = new BatchWriterConfig().setMaxMemory(10L * 1000 * 1000);
AccumuloOutputFormat.setBatchWriterOptions(job, bwConfig);
-
+
Configuration conf = job.getConfiguration();
conf.setLong(NUMROWS, opts.numRows);
conf.setInt("cloudgen.minkeylength", opts.minKeyLength);
@@ -394,10 +393,10 @@ public class TeraSortIngest extends Configured implements Tool {
conf.setInt("cloudgen.minvaluelength", opts.minValueLength);
conf.setInt("cloudgen.maxvaluelength", opts.maxValueLength);
conf.set("cloudgen.tablename", opts.tableName);
-
+
if (args.length > 10)
conf.setInt(NUMSPLITS, opts.splits);
-
+
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7b7521dd/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TokenFileWordCount.java
----------------------------------------------------------------------
diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TokenFileWordCount.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TokenFileWordCount.java
index fc4b27f..c3f6cdb 100644
--- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TokenFileWordCount.java
+++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TokenFileWordCount.java
@@ -22,7 +22,7 @@ import org.apache.accumulo.core.client.ClientConfiguration;
import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
@@ -38,17 +38,17 @@ import org.apache.hadoop.util.ToolRunner;
*
*/
public class TokenFileWordCount extends Configured implements Tool {
-
+
public static class MapClass extends Mapper<LongWritable,Text,Text,Mutation> {
@Override
public void map(LongWritable key, Text value, Context output) throws IOException {
String[] words = value.toString().split("\\s+");
-
+
for (String word : words) {
-
+
Mutation mutation = new Mutation(new Text(word));
mutation.put(new Text("count"), new Text("20080906"), new Value("1".getBytes()));
-
+
try {
output.write(null, mutation);
} catch (InterruptedException e) {
@@ -57,43 +57,44 @@ public class TokenFileWordCount extends Configured implements Tool {
}
}
}
-
+
+ @Override
public int run(String[] args) throws Exception {
-
+
String instance = args[0];
String zookeepers = args[1];
String user = args[2];
String tokenFile = args[3];
String input = args[4];
String tableName = args[5];
-
+
Job job = JobUtil.getJob(getConf());
job.setJobName(TokenFileWordCount.class.getName());
job.setJarByClass(this.getClass());
-
+
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.setInputPaths(job, input);
-
+
job.setMapperClass(MapClass.class);
-
+
job.setNumReduceTasks(0);
-
+
job.setOutputFormatClass(AccumuloOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Mutation.class);
-
+
// AccumuloInputFormat not used here, but it uses the same functions.
AccumuloOutputFormat.setZooKeeperInstance(job, ClientConfiguration.loadDefault().withInstance(instance).withZkHosts(zookeepers));
AccumuloOutputFormat.setConnectorInfo(job, user, tokenFile);
AccumuloOutputFormat.setCreateTables(job, true);
AccumuloOutputFormat.setDefaultTableName(job, tableName);
-
+
job.waitForCompletion(true);
return 0;
}
-
+
public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(CachedConfiguration.getInstance(), new TokenFileWordCount(), args);
+ int res = ToolRunner.run(new Configuration(), new TokenFileWordCount(), args);
System.exit(res);
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7b7521dd/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/UniqueColumns.java
----------------------------------------------------------------------
diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/UniqueColumns.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/UniqueColumns.java
index 23d9d47..e0e29ce 100644
--- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/UniqueColumns.java
+++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/UniqueColumns.java
@@ -26,7 +26,7 @@ import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
@@ -44,35 +44,35 @@ import com.beust.jcommander.Parameter;
* table.
*/
public class UniqueColumns extends Configured implements Tool {
-
+
private static final Text EMPTY = new Text();
-
+
public static class UMapper extends Mapper<Key,Value,Text,Text> {
private Text temp = new Text();
private static final Text CF = new Text("cf:");
private static final Text CQ = new Text("cq:");
-
+
@Override
public void map(Key key, Value value, Context context) throws IOException, InterruptedException {
temp.set(CF);
ByteSequence cf = key.getColumnFamilyData();
temp.append(cf.getBackingArray(), cf.offset(), cf.length());
context.write(temp, EMPTY);
-
+
temp.set(CQ);
ByteSequence cq = key.getColumnQualifierData();
temp.append(cq.getBackingArray(), cq.offset(), cq.length());
context.write(temp, EMPTY);
}
}
-
+
public static class UReducer extends Reducer<Text,Text,Text,Text> {
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
context.write(key, EMPTY);
}
}
-
+
static class Opts extends ClientOnRequiredTable {
@Parameter(names = "--output", description = "output directory")
String output;
@@ -81,21 +81,21 @@ public class UniqueColumns extends Configured implements Tool {
@Parameter(names = "--offline", description = "run against an offline table")
boolean offline = false;
}
-
+
@Override
public int run(String[] args) throws Exception {
Opts opts = new Opts();
opts.parseArgs(UniqueColumns.class.getName(), args);
-
+
String jobName = this.getClass().getSimpleName() + "_" + System.currentTimeMillis();
Job job = JobUtil.getJob(getConf());
job.setJobName(jobName);
job.setJarByClass(this.getClass());
-
+
String clone = opts.tableName;
Connector conn = null;
-
+
opts.setAccumuloConfigs(job);
if (opts.offline) {
@@ -103,41 +103,41 @@ public class UniqueColumns extends Configured implements Tool {
* this example clones the table and takes it offline. If you plan to run map reduce jobs over a table many times, it may be more efficient to compact the
* table, clone it, and then keep using the same clone as input for map reduce.
*/
-
+
conn = opts.getConnector();
clone = opts.tableName + "_" + jobName;
conn.tableOperations().clone(opts.tableName, clone, true, new HashMap<String,String>(), new HashSet<String>());
conn.tableOperations().offline(clone);
-
+
AccumuloInputFormat.setOfflineTableScan(job, true);
AccumuloInputFormat.setInputTableName(job, clone);
}
-
+
job.setInputFormatClass(AccumuloInputFormat.class);
job.setMapperClass(UMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
-
+
job.setCombinerClass(UReducer.class);
job.setReducerClass(UReducer.class);
-
+
job.setNumReduceTasks(opts.reducers);
-
+
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path(opts.output));
-
+
job.waitForCompletion(true);
-
+
if (opts.offline) {
conn.tableOperations().delete(clone);
}
-
+
return job.isSuccessful() ? 0 : 1;
}
-
+
public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(CachedConfiguration.getInstance(), new UniqueColumns(), args);
+ int res = ToolRunner.run(new Configuration(), new UniqueColumns(), args);
System.exit(res);
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7b7521dd/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/WordCount.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/WordCount.java
index 8ca8cbc..220b85c 100644
--- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/WordCount.java
+++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/WordCount.java
@@ -22,7 +22,7 @@ import org.apache.accumulo.core.cli.ClientOnRequiredTable;
import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
@@ -40,22 +40,22 @@ import com.beust.jcommander.Parameter;
*
*/
public class WordCount extends Configured implements Tool {
-
+
static class Opts extends ClientOnRequiredTable {
- @Parameter(names="--input", description="input directory")
+ @Parameter(names = "--input", description = "input directory")
String inputDirectory;
}
-
+
public static class MapClass extends Mapper<LongWritable,Text,Text,Mutation> {
@Override
public void map(LongWritable key, Text value, Context output) throws IOException {
String[] words = value.toString().split("\\s+");
-
+
for (String word : words) {
-
+
Mutation mutation = new Mutation(new Text(word));
mutation.put(new Text("count"), new Text("20080906"), new Value("1".getBytes()));
-
+
try {
output.write(null, mutation);
} catch (InterruptedException e) {
@@ -64,7 +64,8 @@ public class WordCount extends Configured implements Tool {
}
}
}
-
+
+ @Override
public int run(String[] args) throws Exception {
Opts opts = new Opts();
opts.parseArgs(WordCount.class.getName(), args);
@@ -72,14 +73,14 @@ public class WordCount extends Configured implements Tool {
Job job = JobUtil.getJob(getConf());
job.setJobName(WordCount.class.getName());
job.setJarByClass(this.getClass());
-
+
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.setInputPaths(job, new Path(opts.inputDirectory));
-
+
job.setMapperClass(MapClass.class);
-
+
job.setNumReduceTasks(0);
-
+
job.setOutputFormatClass(AccumuloOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Mutation.class);
@@ -87,8 +88,8 @@ public class WordCount extends Configured implements Tool {
job.waitForCompletion(true);
return 0;
}
-
+
public static void main(String[] args) throws Exception {
- ToolRunner.run(CachedConfiguration.getInstance(), new WordCount(), args);
+ ToolRunner.run(new Configuration(), new WordCount(), args);
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7b7521dd/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/bulk/BulkIngestExample.java
----------------------------------------------------------------------
diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/bulk/BulkIngestExample.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/bulk/BulkIngestExample.java
index 5f9b975..72bd7eb 100644
--- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/bulk/BulkIngestExample.java
+++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/bulk/BulkIngestExample.java
@@ -27,7 +27,6 @@ import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
import org.apache.accumulo.core.client.mapreduce.lib.partition.RangePartitioner;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.util.TextUtil;
import org.apache.accumulo.examples.simple.mapreduce.JobUtil;
import org.apache.commons.codec.binary.Base64;
@@ -53,7 +52,7 @@ public class BulkIngestExample extends Configured implements Tool {
public static class MapClass extends Mapper<LongWritable,Text,Text,Text> {
private Text outputKey = new Text();
private Text outputValue = new Text();
-
+
@Override
public void map(LongWritable key, Text value, Context output) throws IOException, InterruptedException {
// split on tab
@@ -64,7 +63,7 @@ public class BulkIngestExample extends Configured implements Tool {
break;
}
}
-
+
if (index > 0) {
outputKey.set(value.getBytes(), 0, index);
outputValue.set(value.getBytes(), index + 1, value.getLength() - (index + 1));
@@ -72,8 +71,9 @@ public class BulkIngestExample extends Configured implements Tool {
}
}
}
-
+
public static class ReduceClass extends Reducer<Text,Text,Key,Value> {
+ @Override
public void reduce(Text key, Iterable<Text> values, Context output) throws IOException, InterruptedException {
// be careful with the timestamp... if you run on a cluster
// where the time is whacked you may not see your updates in
@@ -82,82 +82,83 @@ public class BulkIngestExample extends Configured implements Tool {
// cluster or consider using logical time... one options is
// to let accumulo set the time
long timestamp = System.currentTimeMillis();
-
+
int index = 0;
for (Text value : values) {
Key outputKey = new Key(key, new Text("colf"), new Text(String.format("col_%07d", index)), timestamp);
index++;
-
+
Value outputValue = new Value(value.getBytes(), 0, value.getLength());
output.write(outputKey, outputValue);
}
}
}
-
+
static class Opts extends ClientOnRequiredTable {
- @Parameter(names="--inputDir", required=true)
+ @Parameter(names = "--inputDir", required = true)
String inputDir;
- @Parameter(names="--workDir", required=true)
+ @Parameter(names = "--workDir", required = true)
String workDir;
}
-
+
+ @Override
public int run(String[] args) {
Opts opts = new Opts();
opts.parseArgs(BulkIngestExample.class.getName(), args);
-
+
Configuration conf = getConf();
PrintStream out = null;
try {
Job job = JobUtil.getJob(conf);
job.setJobName("bulk ingest example");
job.setJarByClass(this.getClass());
-
+
job.setInputFormatClass(TextInputFormat.class);
-
+
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
-
+
job.setReducerClass(ReduceClass.class);
job.setOutputFormatClass(AccumuloFileOutputFormat.class);
opts.setAccumuloConfigs(job);
-
+
Connector connector = opts.getConnector();
-
+
TextInputFormat.setInputPaths(job, new Path(opts.inputDir));
AccumuloFileOutputFormat.setOutputPath(job, new Path(opts.workDir + "/files"));
-
+
FileSystem fs = FileSystem.get(conf);
out = new PrintStream(new BufferedOutputStream(fs.create(new Path(opts.workDir + "/splits.txt"))));
-
+
Collection<Text> splits = connector.tableOperations().listSplits(opts.tableName, 100);
for (Text split : splits)
out.println(new String(Base64.encodeBase64(TextUtil.getBytes(split))));
-
+
job.setNumReduceTasks(splits.size() + 1);
out.close();
-
+
job.setPartitionerClass(RangePartitioner.class);
RangePartitioner.setSplitFile(job, opts.workDir + "/splits.txt");
-
+
job.waitForCompletion(true);
Path failures = new Path(opts.workDir, "failures");
fs.delete(failures, true);
fs.mkdirs(new Path(opts.workDir, "failures"));
connector.tableOperations().importDirectory(opts.tableName, opts.workDir + "/files", opts.workDir + "/failures", false);
-
+
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (out != null)
out.close();
}
-
+
return 0;
}
-
+
public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(CachedConfiguration.getInstance(), new BulkIngestExample(), args);
+ int res = ToolRunner.run(new Configuration(), new BulkIngestExample(), args);
System.exit(res);
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7b7521dd/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/bulk/GenerateTestData.java
----------------------------------------------------------------------
diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/bulk/GenerateTestData.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/bulk/GenerateTestData.java
index c1a13b3..5cb4a0b 100644
--- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/bulk/GenerateTestData.java
+++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/bulk/GenerateTestData.java
@@ -20,34 +20,34 @@ import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.PrintStream;
-import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.beust.jcommander.Parameter;
public class GenerateTestData {
-
+
static class Opts extends org.apache.accumulo.core.cli.Help {
- @Parameter(names="--start-row", required=true)
+ @Parameter(names = "--start-row", required = true)
int startRow = 0;
- @Parameter(names="--count", required=true)
+ @Parameter(names = "--count", required = true)
int numRows = 0;
- @Parameter(names="--output", required=true)
+ @Parameter(names = "--output", required = true)
String outputFile;
}
-
+
public static void main(String[] args) throws IOException {
Opts opts = new Opts();
opts.parseArgs(GenerateTestData.class.getName(), args);
-
- FileSystem fs = FileSystem.get(CachedConfiguration.getInstance());
+
+ FileSystem fs = FileSystem.get(new Configuration());
PrintStream out = new PrintStream(new BufferedOutputStream(fs.create(new Path(opts.outputFile))));
-
+
for (int i = 0; i < opts.numRows; i++) {
out.println(String.format("row_%010d\tvalue_%010d", i + opts.startRow, i + opts.startRow));
}
out.close();
}
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7b7521dd/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java
----------------------------------------------------------------------
diff --git a/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java b/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java
index 3d99838..dab1e10 100644
--- a/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java
+++ b/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java
@@ -34,8 +34,8 @@ import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.examples.simple.mapreduce.JobUtil;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
@@ -48,12 +48,12 @@ public class ChunkInputFormatTest extends TestCase {
private static AssertionError e1 = null;
private static AssertionError e2 = null;
private static IOException e3 = null;
-
+
private static final Authorizations AUTHS = new Authorizations("A", "B", "C", "D");
-
+
private static List<Entry<Key,Value>> data;
private static List<Entry<Key,Value>> baddata;
-
+
{
data = new ArrayList<Entry<Key,Value>>();
ChunkInputStreamTest.addData(data, "a", "refs", "ida\0ext", "A&B", "ext");
@@ -71,16 +71,16 @@ public class ChunkInputFormatTest extends TestCase {
ChunkInputStreamTest.addData(baddata, "c", "refs", "ida\0ext", "A&B", "ext");
ChunkInputStreamTest.addData(baddata, "c", "refs", "ida\0name", "A&B", "name");
}
-
+
public static void entryEquals(Entry<Key,Value> e1, Entry<Key,Value> e2) {
assertEquals(e1.getKey(), e2.getKey());
assertEquals(e1.getValue(), e2.getValue());
}
-
+
public static class CIFTester extends Configured implements Tool {
public static class TestMapper extends Mapper<List<Entry<Key,Value>>,InputStream,List<Entry<Key,Value>>,InputStream> {
int count = 0;
-
+
@Override
protected void map(List<Entry<Key,Value>> key, InputStream value, Context context) throws IOException, InterruptedException {
byte[] b = new byte[20];
@@ -113,7 +113,7 @@ public class ChunkInputFormatTest extends TestCase {
}
count++;
}
-
+
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
try {
@@ -123,10 +123,10 @@ public class ChunkInputFormatTest extends TestCase {
}
}
}
-
+
public static class TestNoClose extends Mapper<List<Entry<Key,Value>>,InputStream,List<Entry<Key,Value>>,InputStream> {
int count = 0;
-
+
@Override
protected void map(List<Entry<Key,Value>> key, InputStream value, Context context) throws IOException, InterruptedException {
byte[] b = new byte[5];
@@ -152,7 +152,7 @@ public class ChunkInputFormatTest extends TestCase {
}
}
}
-
+
public static class TestBadData extends Mapper<List<Entry<Key,Value>>,InputStream,List<Entry<Key,Value>>,InputStream> {
@Override
protected void map(List<Entry<Key,Value>> key, InputStream value, Context context) throws IOException, InterruptedException {
@@ -182,13 +182,13 @@ public class ChunkInputFormatTest extends TestCase {
} catch (Exception e) {}
}
}
-
+
@Override
public int run(String[] args) throws Exception {
if (args.length != 5) {
throw new IllegalArgumentException("Usage : " + CIFTester.class.getName() + " <instance name> <user> <pass> <table> <mapperClass>");
}
-
+
String instance = args[0];
String user = args[1];
String pass = args[2];
@@ -197,39 +197,39 @@ public class ChunkInputFormatTest extends TestCase {
Job job = JobUtil.getJob(getConf());
job.setJobName(this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
job.setJarByClass(this.getClass());
-
+
job.setInputFormatClass(ChunkInputFormat.class);
-
+
ChunkInputFormat.setConnectorInfo(job, user, new PasswordToken(pass));
ChunkInputFormat.setInputTableName(job, table);
ChunkInputFormat.setScanAuthorizations(job, AUTHS);
ChunkInputFormat.setMockInstance(job, instance);
-
+
@SuppressWarnings("unchecked")
Class<? extends Mapper<?,?,?,?>> forName = (Class<? extends Mapper<?,?,?,?>>) Class.forName(args[4]);
job.setMapperClass(forName);
job.setMapOutputKeyClass(Key.class);
job.setMapOutputValueClass(Value.class);
job.setOutputFormatClass(NullOutputFormat.class);
-
+
job.setNumReduceTasks(0);
-
+
job.waitForCompletion(true);
-
+
return job.isSuccessful() ? 0 : 1;
}
-
+
public static int main(String[] args) throws Exception {
- return ToolRunner.run(CachedConfiguration.getInstance(), new CIFTester(), args);
+ return ToolRunner.run(new Configuration(), new CIFTester(), args);
}
}
-
+
public void test() throws Exception {
MockInstance instance = new MockInstance("instance1");
Connector conn = instance.getConnector("root", new PasswordToken(""));
conn.tableOperations().create("test");
BatchWriter bw = conn.createBatchWriter("test", new BatchWriterConfig());
-
+
for (Entry<Key,Value> e : data) {
Key k = e.getKey();
Mutation m = new Mutation(k.getRow());
@@ -237,18 +237,18 @@ public class ChunkInputFormatTest extends TestCase {
bw.addMutation(m);
}
bw.close();
-
+
assertEquals(0, CIFTester.main(new String[] {"instance1", "root", "", "test", CIFTester.TestMapper.class.getName()}));
assertNull(e1);
assertNull(e2);
}
-
+
public void testErrorOnNextWithoutClose() throws Exception {
MockInstance instance = new MockInstance("instance2");
Connector conn = instance.getConnector("root", new PasswordToken(""));
conn.tableOperations().create("test");
BatchWriter bw = conn.createBatchWriter("test", new BatchWriterConfig());
-
+
for (Entry<Key,Value> e : data) {
Key k = e.getKey();
Mutation m = new Mutation(k.getRow());
@@ -256,13 +256,13 @@ public class ChunkInputFormatTest extends TestCase {
bw.addMutation(m);
}
bw.close();
-
+
assertEquals(1, CIFTester.main(new String[] {"instance2", "root", "", "test", CIFTester.TestNoClose.class.getName()}));
assertNull(e1);
assertNull(e2);
assertNotNull(e3);
}
-
+
public void testInfoWithoutChunks() throws Exception {
MockInstance instance = new MockInstance("instance3");
Connector conn = instance.getConnector("root", new PasswordToken(""));
@@ -275,7 +275,7 @@ public class ChunkInputFormatTest extends TestCase {
bw.addMutation(m);
}
bw.close();
-
+
assertEquals(0, CIFTester.main(new String[] {"instance3", "root", "", "test", CIFTester.TestBadData.class.getName()}));
assertNull(e0);
assertNull(e1);