You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by md...@apache.org on 2014/03/28 22:26:20 UTC
[16/19] git commit: Merge branch '1.4.5-SNAPSHOT' into 1.5.2-SNAPSHOT
Merge branch '1.4.5-SNAPSHOT' into 1.5.2-SNAPSHOT
Conflicts:
src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java
src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousMoru.java
src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java
src/server/src/main/java/org/apache/accumulo/server/test/functional/RunTests.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/0d2cd1c0
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/0d2cd1c0
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/0d2cd1c0
Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 0d2cd1c06cef923aa4026d9bee3df1966ee50d9c
Parents: 2cca3ee f0759dc
Author: Mike Drob <md...@cloudera.com>
Authored: Fri Mar 28 17:23:48 2014 -0400
Committer: Mike Drob <md...@cloudera.com>
Committed: Fri Mar 28 17:23:48 2014 -0400
----------------------------------------------------------------------
.../simple/mapreduce/TeraSortIngest.java | 4 +-
.../accumulo/server/util/CountRowKeys.java | 3 +-
.../server/util/reflection/CounterUtils.java | 43 ++++++++++++++++++++
.../test/continuous/ContinuousMoru.java | 3 +-
.../test/continuous/ContinuousVerify.java | 29 +++----------
.../accumulo/test/functional/RunTests.java | 3 +-
6 files changed, 56 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0d2cd1c0/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java
----------------------------------------------------------------------
diff --cc examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java
index f06aeec,0000000..f131e6c
mode 100644,000000..100644
--- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java
+++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java
@@@ -1,404 -1,0 +1,404 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.examples.simple.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.accumulo.core.cli.ClientOnRequiredTable;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.beust.jcommander.Parameter;
+
+/**
+ * Generate the *almost* official terasort input data set. (See below) The user specifies the number of rows and the output directory and this class runs a
+ * map/reduce program to generate the data. The format of the data is:
+ * <ul>
+ * <li>(10 bytes key) (10 bytes rowid) (78 bytes filler) \r \n
+ * <li>The keys are random characters from the set ' ' .. '~'.
+ * <li>The rowid is the right justified row id as a int.
+ * <li>The filler consists of 7 runs of 10 characters from 'A' to 'Z'.
+ * </ul>
+ *
+ * This TeraSort is slightly modified to allow for variable length key sizes and value sizes. The row length isn't variable. To generate a terabyte of data in
+ * the same way TeraSort does use 10000000000 rows and 10/10 byte key length and 78/78 byte value length. Along with the 10 byte row id and \r\n this gives you
+ * 100 byte row * 10000000000 rows = 1tb. Min/Max ranges for key and value parameters are inclusive/inclusive respectively.
+ *
+ *
+ */
+public class TeraSortIngest extends Configured implements Tool {
+ /**
+ * An input format that assigns ranges of longs to each mapper.
+ */
+ static class RangeInputFormat extends InputFormat<LongWritable,NullWritable> {
+ /**
+ * An input split consisting of a range on numbers.
+ */
+ static class RangeInputSplit extends InputSplit implements Writable {
+ long firstRow;
+ long rowCount;
+
+ public RangeInputSplit() {}
+
+ public RangeInputSplit(long offset, long length) {
+ firstRow = offset;
+ rowCount = length;
+ }
+
+ @Override
+ public long getLength() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public String[] getLocations() throws IOException {
+ return new String[] {};
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ firstRow = WritableUtils.readVLong(in);
+ rowCount = WritableUtils.readVLong(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeVLong(out, firstRow);
+ WritableUtils.writeVLong(out, rowCount);
+ }
+ }
+
+ /**
+ * A record reader that will generate a range of numbers.
+ */
+ static class RangeRecordReader extends RecordReader<LongWritable,NullWritable> {
+ long startRow;
+ long finishedRows;
+ long totalRows;
+
+ LongWritable currentKey;
+
+ public RangeRecordReader(RangeInputSplit split) {
+ startRow = split.firstRow;
+ finishedRows = 0;
+ totalRows = split.rowCount;
+ }
+
+ @Override
+ public void close() throws IOException {}
+
+ @Override
+ public float getProgress() throws IOException {
+ return finishedRows / (float) totalRows;
+ }
+
+ @Override
+ public LongWritable getCurrentKey() throws IOException, InterruptedException {
+ return new LongWritable(startRow + finishedRows);
+ }
+
+ @Override
+ public NullWritable getCurrentValue() throws IOException, InterruptedException {
+ return NullWritable.get();
+ }
+
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {}
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (finishedRows < totalRows) {
+ ++finishedRows;
+ return true;
+ }
+ return false;
+ }
+ }
+
+ @Override
+ public RecordReader<LongWritable,NullWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {
+ // reporter.setStatus("Creating record reader");
+ return new RangeRecordReader((RangeInputSplit) split);
+ }
+
+ /**
+ * Create the desired number of splits, dividing the number of rows between the mappers.
+ */
+ @Override
+ public List<InputSplit> getSplits(JobContext job) {
- long totalRows = job.getConfiguration().getLong(NUMROWS, 0);
- int numSplits = job.getConfiguration().getInt(NUMSPLITS, 1);
++ long totalRows = InputFormatBase.getConfiguration(job).getLong(NUMROWS, 0);
++ int numSplits = InputFormatBase.getConfiguration(job).getInt(NUMSPLITS, 1);
+ long rowsPerSplit = totalRows / numSplits;
+ System.out.println("Generating " + totalRows + " using " + numSplits + " maps with step of " + rowsPerSplit);
+ ArrayList<InputSplit> splits = new ArrayList<InputSplit>(numSplits);
+ long currentRow = 0;
+ for (int split = 0; split < numSplits - 1; ++split) {
+ splits.add(new RangeInputSplit(currentRow, rowsPerSplit));
+ currentRow += rowsPerSplit;
+ }
+ splits.add(new RangeInputSplit(currentRow, totalRows - currentRow));
+ System.out.println("Done Generating.");
+ return splits;
+ }
+
+ }
+
+ private static String NUMSPLITS = "terasort.overridesplits";
+ private static String NUMROWS = "terasort.numrows";
+
+ static class RandomGenerator {
+ private long seed = 0;
+ private static final long mask32 = (1l << 32) - 1;
+ /**
+ * The number of iterations separating the precomputed seeds.
+ */
+ private static final int seedSkip = 128 * 1024 * 1024;
+ /**
+ * The precomputed seed values after every seedSkip iterations. There should be enough values so that a 2**32 iterations are covered.
+ */
+ private static final long[] seeds = new long[] {0L, 4160749568L, 4026531840L, 3892314112L, 3758096384L, 3623878656L, 3489660928L, 3355443200L, 3221225472L,
+ 3087007744L, 2952790016L, 2818572288L, 2684354560L, 2550136832L, 2415919104L, 2281701376L, 2147483648L, 2013265920L, 1879048192L, 1744830464L,
+ 1610612736L, 1476395008L, 1342177280L, 1207959552L, 1073741824L, 939524096L, 805306368L, 671088640L, 536870912L, 402653184L, 268435456L, 134217728L,};
+
+ /**
+ * Start the random number generator on the given iteration.
+ *
+ * @param initalIteration
+ * the iteration number to start on
+ */
+ RandomGenerator(long initalIteration) {
+ int baseIndex = (int) ((initalIteration & mask32) / seedSkip);
+ seed = seeds[baseIndex];
+ for (int i = 0; i < initalIteration % seedSkip; ++i) {
+ next();
+ }
+ }
+
+ RandomGenerator() {
+ this(0);
+ }
+
+ long next() {
+ seed = (seed * 3141592621l + 663896637) & mask32;
+ return seed;
+ }
+ }
+
+ /**
+ * The Mapper class that given a row number, will generate the appropriate output line.
+ */
+ public static class SortGenMapper extends Mapper<LongWritable,NullWritable,Text,Mutation> {
+ private Text table = null;
+ private int minkeylength = 0;
+ private int maxkeylength = 0;
+ private int minvaluelength = 0;
+ private int maxvaluelength = 0;
+
+ private Text key = new Text();
+ private Text value = new Text();
+ private RandomGenerator rand;
+ private byte[] keyBytes; // = new byte[12];
+ private byte[] spaces = " ".getBytes();
+ private byte[][] filler = new byte[26][];
+ {
+ for (int i = 0; i < 26; ++i) {
+ filler[i] = new byte[10];
+ for (int j = 0; j < 10; ++j) {
+ filler[i][j] = (byte) ('A' + i);
+ }
+ }
+ }
+
+ /**
+ * Add a random key to the text
+ */
+ private Random random = new Random();
+
+ private void addKey() {
+ int range = random.nextInt(maxkeylength - minkeylength + 1);
+ int keylen = range + minkeylength;
+ int keyceil = keylen + (4 - (keylen % 4));
+ keyBytes = new byte[keyceil];
+
+ long temp = 0;
+ for (int i = 0; i < keyceil / 4; i++) {
+ temp = rand.next() / 52;
+ keyBytes[3 + 4 * i] = (byte) (' ' + (temp % 95));
+ temp /= 95;
+ keyBytes[2 + 4 * i] = (byte) (' ' + (temp % 95));
+ temp /= 95;
+ keyBytes[1 + 4 * i] = (byte) (' ' + (temp % 95));
+ temp /= 95;
+ keyBytes[4 * i] = (byte) (' ' + (temp % 95));
+ }
+ key.set(keyBytes, 0, keylen);
+ }
+
+ /**
+ * Add the rowid to the row.
+ *
+ * @param rowId
+ */
+ private Text getRowIdString(long rowId) {
+ Text paddedRowIdString = new Text();
+ byte[] rowid = Integer.toString((int) rowId).getBytes();
+ int padSpace = 10 - rowid.length;
+ if (padSpace > 0) {
+ paddedRowIdString.append(spaces, 0, 10 - rowid.length);
+ }
+ paddedRowIdString.append(rowid, 0, Math.min(rowid.length, 10));
+ return paddedRowIdString;
+ }
+
+ /**
+ * Add the required filler bytes. Each row consists of 7 blocks of 10 characters and 1 block of 8 characters.
+ *
+ * @param rowId
+ * the current row number
+ */
+ private void addFiller(long rowId) {
+ int base = (int) ((rowId * 8) % 26);
+
+ // Get Random var
+ Random random = new Random(rand.seed);
+
+ int range = random.nextInt(maxvaluelength - minvaluelength + 1);
+ int valuelen = range + minvaluelength;
+
+ while (valuelen > 10) {
+ value.append(filler[(base + valuelen) % 26], 0, 10);
+ valuelen -= 10;
+ }
+
+ if (valuelen > 0)
+ value.append(filler[(base + valuelen) % 26], 0, valuelen);
+ }
+
+ @Override
+ public void map(LongWritable row, NullWritable ignored, Context context) throws IOException, InterruptedException {
+ context.setStatus("Entering");
+ long rowId = row.get();
+ if (rand == null) {
+ // we use 3 random numbers per a row
+ rand = new RandomGenerator(rowId * 3);
+ }
+ addKey();
+ value.clear();
+ // addRowId(rowId);
+ addFiller(rowId);
+
+ // New
+ Mutation m = new Mutation(key);
+ m.put(new Text("c"), // column family
+ getRowIdString(rowId), // column qual
+ new Value(value.toString().getBytes())); // data
+
+ context.setStatus("About to add to accumulo");
+ context.write(table, m);
+ context.setStatus("Added to accumulo " + key.toString());
+ }
+
+ @Override
+ public void setup(Context job) {
+ minkeylength = job.getConfiguration().getInt("cloudgen.minkeylength", 0);
+ maxkeylength = job.getConfiguration().getInt("cloudgen.maxkeylength", 0);
+ minvaluelength = job.getConfiguration().getInt("cloudgen.minvaluelength", 0);
+ maxvaluelength = job.getConfiguration().getInt("cloudgen.maxvaluelength", 0);
+ table = new Text(job.getConfiguration().get("cloudgen.tablename"));
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(CachedConfiguration.getInstance(), new TeraSortIngest(), args);
+ System.exit(res);
+ }
+
+ static class Opts extends ClientOnRequiredTable {
+ @Parameter(names = "--count", description = "number of rows to ingest", required = true)
+ long numRows;
+ @Parameter(names = {"-nk", "--minKeySize"}, description = "miniumum key size", required = true)
+ int minKeyLength;
+ @Parameter(names = {"-xk", "--maxKeySize"}, description = "maximum key size", required = true)
+ int maxKeyLength;
+ @Parameter(names = {"-nv", "--minValueSize"}, description = "minimum key size", required = true)
+ int minValueLength;
+ @Parameter(names = {"-xv", "--maxValueSize"}, description = "maximum key size", required = true)
+ int maxValueLength;
+ @Parameter(names = "--splits", description = "number of splits to create in the table")
+ int splits = 0;
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Job job = new Job(getConf(), "TeraSortCloud");
+ job.setJarByClass(this.getClass());
+ Opts opts = new Opts();
+ opts.parseArgs(TeraSortIngest.class.getName(), args);
+
+ job.setInputFormatClass(RangeInputFormat.class);
+ job.setMapperClass(SortGenMapper.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(Mutation.class);
+
+ job.setNumReduceTasks(0);
+
+ job.setOutputFormatClass(AccumuloOutputFormat.class);
+ opts.setAccumuloConfigs(job);
+ BatchWriterConfig bwConfig = new BatchWriterConfig().setMaxMemory(10L * 1000 * 1000);
+ AccumuloOutputFormat.setBatchWriterOptions(job, bwConfig);
+
+ Configuration conf = job.getConfiguration();
+ conf.setLong(NUMROWS, opts.numRows);
+ conf.setInt("cloudgen.minkeylength", opts.minKeyLength);
+ conf.setInt("cloudgen.maxkeylength", opts.maxKeyLength);
+ conf.setInt("cloudgen.minvaluelength", opts.minValueLength);
+ conf.setInt("cloudgen.maxvaluelength", opts.maxValueLength);
+ conf.set("cloudgen.tablename", opts.tableName);
+
+ if (args.length > 10)
+ conf.setInt(NUMSPLITS, opts.splits);
+
+ job.waitForCompletion(true);
+ return job.isSuccessful() ? 0 : 1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0d2cd1c0/server/src/main/java/org/apache/accumulo/server/util/CountRowKeys.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/util/CountRowKeys.java
index 5676394,0000000..88b2dfb
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/util/CountRowKeys.java
+++ b/server/src/main/java/org/apache/accumulo/server/util/CountRowKeys.java
@@@ -1,86 -1,0 +1,87 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.server.ServerConstants;
++import org.apache.accumulo.server.util.reflection.CounterUtils;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class CountRowKeys extends Configured implements Tool {
+ private static class MyMapper extends Mapper<Key,Value,Text,NullWritable> {
+ Text k = new Text();
+
+ public void map(Key key, Value value, Context context) throws IOException, InterruptedException {
+ context.write(key.getRow(k), NullWritable.get());
+ }
+ }
+
+ private static class MyReducer extends Reducer<Text,NullWritable,Text,Text> {
+ public enum Count {
+ uniqueRows
+ }
+
+ public void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException {
- context.getCounter(Count.uniqueRows).increment(1);
++ CounterUtils.increment(context.getCounter(Count.uniqueRows));
+ }
+ }
+
+ @Override
+ public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
+ if (args.length != 2) {
+ System.out.println("Usage: CountRowKeys tableName outputPath");
+ return 1;
+ }
+
+ Job job = new Job(getConf(), this.getClass().getName());
+ job.setJarByClass(this.getClass());
+
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ SequenceFileInputFormat.addInputPath(job, new Path(ServerConstants.getTablesDir() + "/" + args[0] + "/*/*/data"));
+
+ job.setMapperClass(MyMapper.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(NullWritable.class);
+
+ job.setReducerClass(MyReducer.class);
+
+ TextOutputFormat.setOutputPath(job, new Path(args[1]));
+
+ job.waitForCompletion(true);
+ return job.isSuccessful() ? 0 : 1;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(CachedConfiguration.getInstance(), new CountRowKeys(), args);
+ if (res != 0)
+ System.exit(res);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0d2cd1c0/server/src/main/java/org/apache/accumulo/server/util/reflection/CounterUtils.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/util/reflection/CounterUtils.java
index 0000000,0000000..dbd5f60
new file mode 100644
--- /dev/null
+++ b/server/src/main/java/org/apache/accumulo/server/util/reflection/CounterUtils.java
@@@ -1,0 -1,0 +1,43 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.accumulo.server.util.reflection;
++
++import java.lang.reflect.Method;
++
++import org.apache.hadoop.mapreduce.Counter;
++
++/**
++ * Utility class for incrementing counters in a compatible way between hadoop 1 and 2
++ */
++public class CounterUtils {
++ static private Method INCREMENT;
++ static {
++ try {
++ INCREMENT = Counter.class.getMethod("increment", Long.TYPE);
++ } catch (Exception ex) {
++ throw new RuntimeException(ex);
++ }
++ }
++
++ public static void increment(Counter counter) {
++ try {
++ INCREMENT.invoke(counter, 1L);
++ } catch (Exception ex) {
++ throw new RuntimeException(ex);
++ }
++ }
++}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0d2cd1c0/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java
index bbe7fa3,0000000..a35ca66
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java
@@@ -1,180 -1,0 +1,181 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.continuous;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.util.CachedConfiguration;
++import org.apache.accumulo.server.util.reflection.CounterUtils;
+import org.apache.accumulo.test.continuous.ContinuousIngest.BaseOpts;
+import org.apache.accumulo.test.continuous.ContinuousIngest.ShortConverter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.validators.PositiveInteger;
+
+/**
+ * A map only job that reads a table created by continuous ingest and creates doubly linked list. This map reduce job tests the ability of a map only job to
+ * read and write to accumulo at the same time. This map reduce job mutates the table in such a way that it should not create any undefined nodes.
+ *
+ */
+public class ContinuousMoru extends Configured implements Tool {
+ private static final String PREFIX = ContinuousMoru.class.getSimpleName() + ".";
+ private static final String MAX_CQ = PREFIX + "MAX_CQ";
+ private static final String MAX_CF = PREFIX + "MAX_CF";
+ private static final String MAX = PREFIX + "MAX";
+ private static final String MIN = PREFIX + "MIN";
+ private static final String CI_ID = PREFIX + "CI_ID";
+
+ static enum Counts {
+ SELF_READ;
+ }
+
+ public static class CMapper extends Mapper<Key,Value,Text,Mutation> {
+
+ private short max_cf;
+ private short max_cq;
+ private Random random;
+ private String ingestInstanceId;
+ private byte[] iiId;
+ private long count;
+
+ private static final ColumnVisibility EMPTY_VIS = new ColumnVisibility();
+
+ @Override
+ public void setup(Context context) throws IOException, InterruptedException {
+ int max_cf = context.getConfiguration().getInt(MAX_CF, -1);
+ int max_cq = context.getConfiguration().getInt(MAX_CQ, -1);
+
+ if (max_cf > Short.MAX_VALUE || max_cq > Short.MAX_VALUE)
+ throw new IllegalArgumentException();
+
+ this.max_cf = (short) max_cf;
+ this.max_cq = (short) max_cq;
+
+ random = new Random();
+ ingestInstanceId = context.getConfiguration().get(CI_ID);
+ iiId = ingestInstanceId.getBytes(Constants.UTF8);
+
+ count = 0;
+ }
+
+ @Override
+ public void map(Key key, Value data, Context context) throws IOException, InterruptedException {
+
+ ContinuousWalk.validate(key, data);
+
+ if (WritableComparator.compareBytes(iiId, 0, iiId.length, data.get(), 0, iiId.length) != 0) {
+ // only rewrite data not written by this M/R job
+ byte[] val = data.get();
+
+ int offset = ContinuousWalk.getPrevRowOffset(val);
+ if (offset > 0) {
+ long rowLong = Long.parseLong(new String(val, offset, 16, Constants.UTF8), 16);
+ Mutation m = ContinuousIngest.genMutation(rowLong, random.nextInt(max_cf), random.nextInt(max_cq), EMPTY_VIS, iiId, count++, key.getRowData()
+ .toArray(), random, true);
+ context.write(null, m);
+ }
+
+ } else {
- ContinuousVerify.increment(context.getCounter(Counts.SELF_READ));
++ CounterUtils.increment(context.getCounter(Counts.SELF_READ));
+ }
+ }
+ }
+
+ static class Opts extends BaseOpts {
+ @Parameter(names = "--maxColF", description = "maximum column family value to use", converter=ShortConverter.class)
+ short maxColF = Short.MAX_VALUE;
+
+ @Parameter(names = "--maxColQ", description = "maximum column qualifier value to use", converter=ShortConverter.class)
+ short maxColQ = Short.MAX_VALUE;
+
+ @Parameter(names = "--maxMappers", description = "the maximum number of mappers to use", required = true, validateWith = PositiveInteger.class)
+ int maxMaps = 0;
+ }
+
+ @Override
+ public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException, AccumuloSecurityException {
+ Opts opts = new Opts();
+ BatchWriterOpts bwOpts = new BatchWriterOpts();
+ opts.parseArgs(ContinuousMoru.class.getName(), args, bwOpts);
+
+ Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
+ job.setJarByClass(this.getClass());
+
+ job.setInputFormatClass(AccumuloInputFormat.class);
+ opts.setAccumuloConfigs(job);
+
+ // set up ranges
+ try {
+ Set<Range> ranges = opts.getConnector().tableOperations().splitRangeByTablets(opts.getTableName(), new Range(), opts.maxMaps);
+ AccumuloInputFormat.setRanges(job, ranges);
+ AccumuloInputFormat.setAutoAdjustRanges(job, false);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+
+ job.setMapperClass(CMapper.class);
+
+ job.setNumReduceTasks(0);
+
+ job.setOutputFormatClass(AccumuloOutputFormat.class);
+ AccumuloOutputFormat.setBatchWriterOptions(job, bwOpts.getBatchWriterConfig());
+
+ Configuration conf = job.getConfiguration();
+ conf.setLong(MIN, opts.min);
+ conf.setLong(MAX, opts.max);
+ conf.setInt(MAX_CF, opts.maxColF);
+ conf.setInt(MAX_CQ, opts.maxColQ);
+ conf.set(CI_ID, UUID.randomUUID().toString());
+
+ job.waitForCompletion(true);
+ opts.stopTracing();
+ return job.isSuccessful() ? 0 : 1;
+ }
+
+ /**
+ *
+ * @param args
+ * instanceName zookeepers username password table columns outputpath
+ * @throws Exception
+ */
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(CachedConfiguration.getInstance(), new ContinuousMoru(), args);
+ if (res != 0)
+ System.exit(res);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0d2cd1c0/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java
index 07e0c92,0000000..8095b50
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java
@@@ -1,243 -1,0 +1,224 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.continuous;
+
+import java.io.IOException;
- import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.cli.ClientOnDefaultTable;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.CachedConfiguration;
++import org.apache.accumulo.server.util.reflection.CounterUtils;
+import org.apache.accumulo.test.continuous.ContinuousWalk.BadChecksumException;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.validators.PositiveInteger;
+
+/**
+ * A map reduce job that verifies a table created by continuous ingest. It verifies that all referenced nodes are defined.
+ */
+
+public class ContinuousVerify extends Configured implements Tool {
-
- // work around hadoop-1/hadoop-2 runtime incompatibility
- static private Method INCREMENT;
- static {
- try {
- INCREMENT = Counter.class.getMethod("increment", Long.TYPE);
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- }
-
- static void increment(Object obj) {
- try {
- INCREMENT.invoke(obj, 1L);
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- }
-
+ public static final VLongWritable DEF = new VLongWritable(-1);
+
+ public static class CMapper extends Mapper<Key,Value,LongWritable,VLongWritable> {
+
+ private LongWritable row = new LongWritable();
+ private LongWritable ref = new LongWritable();
+ private VLongWritable vrow = new VLongWritable();
+
+ private long corrupt = 0;
+
+ @Override
+ public void map(Key key, Value data, Context context) throws IOException, InterruptedException {
+ long r = Long.parseLong(key.getRow().toString(), 16);
+ if (r < 0)
+ throw new IllegalArgumentException();
+
+ try {
+ ContinuousWalk.validate(key, data);
+ } catch (BadChecksumException bce) {
- increment(context.getCounter(Counts.CORRUPT));
++ CounterUtils.increment(context.getCounter(Counts.CORRUPT));
+ if (corrupt < 1000) {
+ System.out.println("ERROR Bad checksum : " + key);
+ } else if (corrupt == 1000) {
+ System.out.println("Too many bad checksums, not printing anymore!");
+ }
+ corrupt++;
+ return;
+ }
+
+ row.set(r);
+
+ context.write(row, DEF);
+ byte[] val = data.get();
+
+ int offset = ContinuousWalk.getPrevRowOffset(val);
+ if (offset > 0) {
+ ref.set(Long.parseLong(new String(val, offset, 16, Constants.UTF8), 16));
+ vrow.set(r);
+ context.write(ref, vrow);
+ }
+ }
+ }
+
+ public static enum Counts {
+ UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT
+ }
+
+ public static class CReducer extends Reducer<LongWritable,VLongWritable,Text,Text> {
+ private ArrayList<Long> refs = new ArrayList<Long>();
+
+ @Override
+ public void reduce(LongWritable key, Iterable<VLongWritable> values, Context context) throws IOException, InterruptedException {
+
+ int defCount = 0;
+
+ refs.clear();
+ for (VLongWritable type : values) {
+ if (type.get() == -1) {
+ defCount++;
+ } else {
+ refs.add(type.get());
+ }
+ }
+
+ if (defCount == 0 && refs.size() > 0) {
+ StringBuilder sb = new StringBuilder();
+ String comma = "";
+ for (Long ref : refs) {
+ sb.append(comma);
+ comma = ",";
+ sb.append(new String(ContinuousIngest.genRow(ref), Constants.UTF8));
+ }
+
+ context.write(new Text(ContinuousIngest.genRow(key.get())), new Text(sb.toString()));
- increment(context.getCounter(Counts.UNDEFINED));
++ CounterUtils.increment(context.getCounter(Counts.UNDEFINED));
+
+ } else if (defCount > 0 && refs.size() == 0) {
- increment(context.getCounter(Counts.UNREFERENCED));
++ CounterUtils.increment(context.getCounter(Counts.UNREFERENCED));
+ } else {
- increment(context.getCounter(Counts.REFERENCED));
++ CounterUtils.increment(context.getCounter(Counts.REFERENCED));
+ }
+
+ }
+ }
+
+ static class Opts extends ClientOnDefaultTable {
+ @Parameter(names = "--output", description = "location in HDFS to store the results; must not exist", required = true)
+ String outputDir = "/tmp/continuousVerify";
+
+ @Parameter(names = "--maxMappers", description = "the maximum number of mappers to use", required = true, validateWith = PositiveInteger.class)
+ int maxMaps = 0;
+
+ @Parameter(names = "--reducers", description = "the number of reducers to use", required = true, validateWith = PositiveInteger.class)
+ int reducers = 0;
+
+ @Parameter(names = "--offline", description = "perform the verification directly on the files while the table is offline")
+ boolean scanOffline = false;
+
+ public Opts() {
+ super("ci");
+ }
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Opts opts = new Opts();
+ opts.parseArgs(this.getClass().getName(), args);
+
+ Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
+ job.setJarByClass(this.getClass());
+
+ job.setInputFormatClass(AccumuloInputFormat.class);
+ opts.setAccumuloConfigs(job);
+
+ Set<Range> ranges = null;
+ String clone = opts.getTableName();
+ Connector conn = null;
+
+ if (opts.scanOffline) {
+ Random random = new Random();
+ clone = opts.getTableName() + "_" + String.format("%016x", (random.nextLong() & 0x7fffffffffffffffl));
+ conn = opts.getConnector();
+ conn.tableOperations().clone(opts.getTableName(), clone, true, new HashMap<String,String>(), new HashSet<String>());
+ ranges = conn.tableOperations().splitRangeByTablets(opts.getTableName(), new Range(), opts.maxMaps);
+ conn.tableOperations().offline(clone);
+ AccumuloInputFormat.setInputTableName(job, clone);
+ AccumuloInputFormat.setOfflineTableScan(job, true);
+ } else {
+ ranges = opts.getConnector().tableOperations().splitRangeByTablets(opts.getTableName(), new Range(), opts.maxMaps);
+ }
+
+ AccumuloInputFormat.setRanges(job, ranges);
+ AccumuloInputFormat.setAutoAdjustRanges(job, false);
+
+ job.setMapperClass(CMapper.class);
+ job.setMapOutputKeyClass(LongWritable.class);
+ job.setMapOutputValueClass(VLongWritable.class);
+
+ job.setReducerClass(CReducer.class);
+ job.setNumReduceTasks(opts.reducers);
+
+ job.setOutputFormatClass(TextOutputFormat.class);
+
+ job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", opts.scanOffline);
+
+ TextOutputFormat.setOutputPath(job, new Path(opts.outputDir));
+
+ job.waitForCompletion(true);
+
+ if (opts.scanOffline) {
+ conn.tableOperations().delete(clone);
+ }
+ opts.stopTracing();
+ return job.isSuccessful() ? 0 : 1;
+ }
+
+ /**
+ *
+ * @param args
+ * instanceName zookeepers username password table columns outputpath
+ * @throws Exception
+ */
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(CachedConfiguration.getInstance(), new ContinuousVerify(), args);
+ if (res != 0)
+ System.exit(res);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0d2cd1c0/test/src/main/java/org/apache/accumulo/test/functional/RunTests.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/functional/RunTests.java
index f6ebe87,0000000..2b775c5
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/RunTests.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/RunTests.java
@@@ -1,216 -1,0 +1,217 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.cli.Help;
++import org.apache.accumulo.server.util.reflection.CounterUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
+
+import com.beust.jcommander.Parameter;
+
+/**
+ * Runs the functional tests via map-reduce.
+ *
+ * First, be sure everything is compiled.
+ *
+ * Second, get a list of the tests you want to run:
+ *
+ * <pre>
+ * $ python test/system/auto/run.py -l > tests
+ * </pre>
+ *
+ * Put the list of tests into HDFS:
+ *
+ * <pre>
+ * $ hadoop fs -put tests /user/hadoop/tests
+ * </pre>
+ *
+ * Run the map-reduce job:
+ *
+ * <pre>
+ * $ ./bin/accumulo accumulo.test.functional.RunTests --tests /user/hadoop/tests --output /user/hadoop/results
+ * </pre>
+ *
+ * Note that you will need to have some configuration in conf/accumulo-site.xml (to locate zookeeper). The map-reduce jobs will not use your local accumulo
+ * instance.
+ *
+ */
+public class RunTests extends Configured implements Tool {
+
+ static final public String JOB_NAME = "Functional Test Runner";
+ private static final Logger log = Logger.getLogger(RunTests.class);
+
+ private Job job = null;
+
+ private static final int DEFAULT_TIMEOUT_FACTOR = 1;
+
+ static class Opts extends Help {
+ @Parameter(names="--tests", description="newline separated list of tests to run", required=true)
+ String testFile;
+ @Parameter(names="--output", description="destination for the results of tests in HDFS", required=true)
+ String outputPath;
+ @Parameter(names="--timeoutFactor", description="Optional scaling factor for timeout for both mapred.task.timeout and -f flag on run.py", required=false)
+ Integer intTimeoutFactor = DEFAULT_TIMEOUT_FACTOR;
+ }
+
+ static final String TIMEOUT_FACTOR = RunTests.class.getName() + ".timeoutFactor";
+
+ static public class TestMapper extends Mapper<LongWritable,Text,Text,Text> {
+
+ private static final String REDUCER_RESULT_START = "::::: ";
+ private static final int RRS_LEN = REDUCER_RESULT_START.length();
+ private Text result = new Text();
+ String mapperTimeoutFactor = null;
+
+ private static enum Outcome {
+ SUCCESS, FAILURE, ERROR, UNEXPECTED_SUCCESS, EXPECTED_FAILURE
+ }
+ private static final Map<Character, Outcome> OUTCOME_COUNTERS;
+ static {
+ OUTCOME_COUNTERS = new java.util.HashMap<Character, Outcome>();
+ OUTCOME_COUNTERS.put('S', Outcome.SUCCESS);
+ OUTCOME_COUNTERS.put('F', Outcome.FAILURE);
+ OUTCOME_COUNTERS.put('E', Outcome.ERROR);
+ OUTCOME_COUNTERS.put('T', Outcome.UNEXPECTED_SUCCESS);
+ OUTCOME_COUNTERS.put('G', Outcome.EXPECTED_FAILURE);
+ }
+
+ @Override
+ protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+ List<String> cmd = Arrays.asList("/usr/bin/python", "test/system/auto/run.py", "-m", "-f", mapperTimeoutFactor, "-t", value.toString());
+ log.info("Running test " + cmd);
+ ProcessBuilder pb = new ProcessBuilder(cmd);
+ pb.directory(new File(context.getConfiguration().get("accumulo.home")));
+ pb.redirectErrorStream(true);
+ Process p = pb.start();
+ p.getOutputStream().close();
+ InputStream out = p.getInputStream();
+ InputStreamReader outr = new InputStreamReader(out, Constants.UTF8);
+ BufferedReader br = new BufferedReader(outr);
+ String line;
+ try {
+ while ((line = br.readLine()) != null) {
+ log.info("More: " + line);
+ if (line.startsWith(REDUCER_RESULT_START)) {
+ String resultLine = line.substring(RRS_LEN);
+ if (resultLine.length() > 0) {
+ Outcome outcome = OUTCOME_COUNTERS.get(resultLine.charAt(0));
+ if (outcome != null) {
- context.getCounter(outcome).increment(1);
++ CounterUtils.increment(context.getCounter(outcome));
+ }
+ }
+ String taskAttemptId = context.getTaskAttemptID().toString();
+ result.set(taskAttemptId + " " + resultLine);
+ context.write(value, result);
+ }
+ }
+ } catch (Exception ex) {
+ log.error(ex);
+ context.progress();
+ }
+
+ p.waitFor();
+ }
+
+ @Override
+ protected void setup(Mapper<LongWritable,Text,Text,Text>.Context context) throws IOException, InterruptedException {
+ mapperTimeoutFactor = Integer.toString(context.getConfiguration().getInt(TIMEOUT_FACTOR, DEFAULT_TIMEOUT_FACTOR));
+ }
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ job = new Job(getConf(), JOB_NAME);
+ job.setJarByClass(this.getClass());
+ Opts opts = new Opts();
+ opts.parseArgs(RunTests.class.getName(), args);
+
+ // this is like 1-2 tests per mapper
+ Configuration conf = job.getConfiguration();
+ conf.setInt("mapred.max.split.size", 40);
+ conf.set("accumulo.home", System.getenv("ACCUMULO_HOME"));
+
+ // Taking third argument as scaling factor to setting mapred.task.timeout
+ // and TIMEOUT_FACTOR
+ conf.setInt("mapred.task.timeout", opts.intTimeoutFactor * 8 * 60 * 1000);
+ conf.setInt(TIMEOUT_FACTOR, opts.intTimeoutFactor);
+ conf.setBoolean("mapred.map.tasks.speculative.execution", false);
+
+ // set input
+ job.setInputFormatClass(TextInputFormat.class);
+ TextInputFormat.setInputPaths(job, new Path(opts.testFile));
+
+ // set output
+ job.setOutputFormatClass(TextOutputFormat.class);
+ FileSystem fs = FileSystem.get(conf);
+ Path destination = new Path(opts.outputPath);
+ if (fs.exists(destination)) {
+ log.info("Deleting existing output directory " + opts.outputPath);
+ fs.delete(destination, true);
+ }
+ TextOutputFormat.setOutputPath(job, destination);
+
+ // configure default reducer: put the results into one file
+ job.setNumReduceTasks(1);
+
+ // set mapper
+ job.setMapperClass(TestMapper.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+
+ // don't do anything with the results (yet) a summary would be nice
+ job.setNumReduceTasks(0);
+
+ // submit the job
+ log.info("Starting tests");
+ return 0;
+ }
+
+ /**
+ * @param args
+ * @throws Exception
+ */
+ public static void main(String[] args) throws Exception {
+ RunTests tests = new RunTests();
+ ToolRunner.run(new Configuration(), tests, args);
+ tests.job.waitForCompletion(true);
+ if (!tests.job.isSuccessful())
+ System.exit(1);
+ }
+
+}