You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2013/10/29 00:09:53 UTC
svn commit: r1536551 -
/hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java
Author: enis
Date: Mon Oct 28 23:09:52 2013
New Revision: 1536551
URL: http://svn.apache.org/r1536551
Log:
HBASE-9759 Prevent random number collision in IntegrationTestBulkLoad
Modified:
hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java
Modified: hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java?rev=1536551&r1=1536550&r2=1536551&view=diff
==============================================================================
--- hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java (original)
+++ hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java Mon Oct 28 23:09:52 2013
@@ -29,7 +29,10 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
+import org.apache.commons.cli.CommandLine;
import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
@@ -48,7 +51,7 @@ import org.apache.hadoop.hbase.util.Envi
import org.apache.hadoop.hbase.util.RegionSplitter;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;
@@ -61,7 +64,6 @@ import org.apache.hadoop.mapreduce.Parti
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.junit.Test;
@@ -101,6 +103,8 @@ import org.junit.experimental.categories
@Category(IntegrationTests.class)
public class IntegrationTestBulkLoad extends IntegrationTestBase {
+ private static final Log LOG = LogFactory.getLog(IntegrationTestBulkLoad.class);
+
private static byte[] CHAIN_FAM = Bytes.toBytes("L");
private static byte[] SORT_FAM = Bytes.toBytes("S");
private static byte[] DATA_FAM = Bytes.toBytes("D");
@@ -114,18 +118,24 @@ public class IntegrationTestBulkLoad ext
private static String NUM_IMPORT_ROUNDS_KEY = "hbase.IntegrationTestBulkLoad.numImportRounds";
private static int NUM_IMPORT_ROUNDS = 1;
+ private static String ROUND_NUM_KEY = "hbase.IntegrationTestBulkLoad.roundNum";
private static String TABLE_NAME_KEY = "hbase.IntegrationTestBulkLoad.tableName";
private static String TABLE_NAME = "IntegrationTestBulkLoad";
@Test
public void testBulkLoad() throws Exception {
+ runLoad();
+ runCheck();
+ }
+
+ public void runLoad() throws Exception {
setupTable();
int numImportRounds = getConf().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS);
+ LOG.info("Running load with numIterations:" + numImportRounds);
for (int i = 0; i < numImportRounds; i++) {
runLinkedListMRJob(i);
}
- runCheck();
}
private byte[][] getSplits(int numRegions) {
@@ -156,13 +166,14 @@ public class IntegrationTestBulkLoad ext
conf.setBoolean("mapreduce.map.speculative", false);
conf.setBoolean("mapreduce.reduce.speculative", false);
+ conf.setInt(ROUND_NUM_KEY, iteration);
Job job = new Job(conf);
job.setJobName(jobName);
// set the input format so that we can create map tasks with no data input.
- job.setInputFormatClass(RandomInputFormat.class);
+ job.setInputFormatClass(ITBulkLoadInputFormat.class);
// Set the mapper classes.
job.setMapperClass(LinkedListCreationMapper.class);
@@ -194,74 +205,78 @@ public class IntegrationTestBulkLoad ext
util.getTestFileSystem().delete(p, true);
}
- /**
- * Class to generate splits. Each split gets a dummy split file. The associated
- * RecordReader generates a single random number.
- *
- * This class is adapted from Hadoop tests.
- */
- static class RandomInputFormat extends InputFormat<Text, LongWritable> {
- public List<InputSplit> getSplits(JobContext job) throws IOException {
- List<InputSplit> result = new ArrayList<InputSplit>();
- int numSplits = job.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS);
- for (int i = 0; i < numSplits; ++i) {
- result.add(new FileSplit(new Path("/tmp", "dummy-split-" + i), 0, 1, null));
- }
- return result;
- }
-
- /**
- * RecordReader that doesn't read anything. Instead it generates a single random number.
- * This is useful for debugging or starting map tasks with no data inpput.
- *
- * This class is adapted from Hadoop tests.
- */
- static class RandomRecordReader extends RecordReader<Text, LongWritable> {
- Path name;
- Text key = null;
- LongWritable value = new LongWritable();
-
- public RandomRecordReader(Path p) {
- name = p;
- }
-
- public void initialize(InputSplit split,
- TaskAttemptContext context)
- throws IOException, InterruptedException {
-
- }
-
- public boolean nextKeyValue() {
- if (name != null) {
- key = new Text();
- key.set(name.getName());
- name = null;
- value.set(new Random().nextLong());
- return true;
- }
- return false;
- }
-
- public Text getCurrentKey() {
- return key;
- }
+ public static class EmptySplit extends InputSplit implements Writable {
+ @Override
+ public void write(DataOutput out) throws IOException { }
+ @Override
+ public void readFields(DataInput in) throws IOException { }
+ @Override
+ public long getLength() { return 0L; }
+ @Override
+ public String[] getLocations() { return new String[0]; }
+ }
- public LongWritable getCurrentValue() {
- return value;
- }
+ public static class FixedRecordReader<K, V> extends RecordReader<K, V> {
+ private int index = -1;
+ private K[] keys;
+ private V[] values;
- public void close() {
- }
+ public FixedRecordReader(K[] keys, V[] values) {
+ this.keys = keys;
+ this.values = values;
+ }
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
+ InterruptedException { }
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ return ++index < keys.length;
+ }
+ @Override
+ public K getCurrentKey() throws IOException, InterruptedException {
+ return keys[index];
+ }
+ @Override
+ public V getCurrentValue() throws IOException, InterruptedException {
+ return values[index];
+ }
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return (float)index / keys.length;
+ }
+ @Override
+ public void close() throws IOException {
+ }
+ }
- public float getProgress() {
- return 0.0f;
+ public static class ITBulkLoadInputFormat extends InputFormat<LongWritable, LongWritable> {
+ @Override
+ public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
+ int numSplits = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS);
+ ArrayList<InputSplit> ret = new ArrayList<InputSplit>(numSplits);
+ for (int i = 0; i < numSplits; ++i) {
+ ret.add(new EmptySplit());
}
+ return ret;
}
- public RecordReader<Text, LongWritable> createRecordReader(InputSplit split,
- TaskAttemptContext context)
- throws IOException, InterruptedException {
- return new RandomRecordReader(((FileSplit) split).getPath());
+ @Override
+ public RecordReader<LongWritable, LongWritable> createRecordReader(InputSplit split,
+ TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ int taskId = context.getTaskAttemptID().getTaskID().getId();
+ int numMapTasks = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS);
+ int numIterations = context.getConfiguration().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS);
+ int iteration = context.getConfiguration().getInt(ROUND_NUM_KEY, 0);
+
+ taskId = taskId + iteration * numMapTasks;
+ numMapTasks = numMapTasks * numIterations;
+
+ long chainId = Math.abs(new Random().nextLong());
+ chainId = chainId - (chainId % numMapTasks) + taskId; // ensure that chainId is unique per task and across iterations
+ LongWritable[] keys = new LongWritable[] {new LongWritable(chainId)};
+
+ return new FixedRecordReader<LongWritable, LongWritable>(keys, keys);
}
}
@@ -273,19 +288,21 @@ public class IntegrationTestBulkLoad ext
* All lists should be CHAIN_LENGTH long.
*/
public static class LinkedListCreationMapper
- extends Mapper<Text, LongWritable, ImmutableBytesWritable, KeyValue> {
+ extends Mapper<LongWritable, LongWritable, ImmutableBytesWritable, KeyValue> {
private Random rand = new Random();
- protected void map(Text key, LongWritable value, Context context)
+ @Override
+ protected void map(LongWritable key, LongWritable value, Context context)
throws IOException, InterruptedException {
-
long chainId = value.get();
+ LOG.info("Starting mapper with chainId:" + chainId);
+
byte[] chainIdArray = Bytes.toBytes(chainId);
long currentRow = 0;
- long nextRow = Math.abs(rand.nextLong());
- int chainLength = context.getConfiguration().getInt(CHAIN_LENGTH_KEY, CHAIN_LENGTH);
+ long chainLength = context.getConfiguration().getLong(CHAIN_LENGTH_KEY, CHAIN_LENGTH);
+ long nextRow = getNextRow(0, chainLength);
for (long i = 0; i < chainLength; i++) {
byte[] rk = Bytes.toBytes(currentRow);
@@ -305,9 +322,19 @@ public class IntegrationTestBulkLoad ext
context.write(new ImmutableBytesWritable(rk), dataKv);
// Move to the next row.
currentRow = nextRow;
- nextRow = Math.abs(rand.nextLong());
+ nextRow = getNextRow(i+1, chainLength);
}
}
+
+ /** Returns a unique row id within this chain for this index */
+ private long getNextRow(long index, long chainLength) {
+ long nextRow = Math.abs(rand.nextLong());
+ // use significant bits from the random number, but pad with index to ensure it is unique
+ // this also ensures that we do not reuse row = 0
+ // row collisions from multiple mappers are fine, since we guarantee unique chainIds
+ nextRow = nextRow - (nextRow % chainLength) + index;
+ return nextRow;
+ }
}
/**
@@ -427,6 +454,7 @@ public class IntegrationTestBulkLoad ext
super(LinkKey.class, true);
}
+ @Override
public int compare(WritableComparable w1, WritableComparable w2) {
LinkKey k1 = (LinkKey) w1;
LinkKey k2 = (LinkKey) w2;
@@ -461,6 +489,7 @@ public class IntegrationTestBulkLoad ext
* and value for each.
*/
public static class LinkedListCheckingMapper extends TableMapper<LinkKey, LinkChain> {
+ @Override
protected void map(ImmutableBytesWritable key, Result value, Context context)
throws IOException, InterruptedException {
long longRk = Bytes.toLong(value.getRow());
@@ -486,6 +515,7 @@ public class IntegrationTestBulkLoad ext
*/
public static class LinkedListCheckingReducer
extends Reducer<LinkKey, LinkChain, NullWritable, NullWritable> {
+ @Override
protected void reduce(LinkKey key, Iterable<LinkChain> values, Context context)
throws java.io.IOException, java.lang.InterruptedException {
long next = -1L;
@@ -494,12 +524,13 @@ public class IntegrationTestBulkLoad ext
for (LinkChain lc : values) {
if (next == -1) {
- if (lc.getRk() != 0L) throw new RuntimeException("Chains should all start at 0 rk");
+ if (lc.getRk() != 0L) throw new RuntimeException("Chains should all start at 0 rk"
+ + ". Chain:" + key.chainId + ", order:" + key.order);
next = lc.getNext();
} else {
if (next != lc.getRk())
- throw new RuntimeException("Missing a link in the chain. Expecthing " +
- next + " got " + lc.getRk());
+ throw new RuntimeException("Missing a link in the chain. Expecting " +
+ next + " got " + lc.getRk() + ". Chain:" + key.chainId + ", order:" + key.order);
next = lc.getNext();
}
count++;
@@ -508,7 +539,7 @@ public class IntegrationTestBulkLoad ext
int expectedChainLen = context.getConfiguration().getInt(CHAIN_LENGTH_KEY, CHAIN_LENGTH);
if (count != expectedChainLen)
throw new RuntimeException("Chain wasn't the correct length. Expected " +
- expectedChainLen + " got " + count);
+ expectedChainLen + " got " + count + ". Chain:" + key.chainId + ", order:" + key.order);
}
}
@@ -519,6 +550,7 @@ public class IntegrationTestBulkLoad ext
* @throws InterruptedException
*/
private void runCheck() throws IOException, ClassNotFoundException, InterruptedException {
+ LOG.info("Running check");
Configuration conf = getConf();
String jobName = getTablename() + "_check" + EnvironmentEdgeManager.currentTimeMillis();
Path p = util.getDataTestDirOnTestFS(jobName);
@@ -536,7 +568,7 @@ public class IntegrationTestBulkLoad ext
s.addFamily(SORT_FAM);
s.setMaxVersions(1);
s.setCacheBlocks(false);
- s.setBatch(100);
+ s.setBatch(1000);
TableMapReduceUtil.initTableMapperJob(
Bytes.toBytes(getTablename()),
@@ -575,9 +607,35 @@ public class IntegrationTestBulkLoad ext
}
}
+ private static final String OPT_LOAD = "load";
+ private static final String OPT_CHECK = "check";
+
+ private boolean load = false;
+ private boolean check = false;
+
+ @Override
+ protected void addOptions() {
+ super.addOptions();
+ super.addOptNoArg(OPT_CHECK, "Run check only");
+ super.addOptNoArg(OPT_LOAD, "Run load only");
+ }
+
+ @Override
+ protected void processOptions(CommandLine cmd) {
+ super.processOptions(cmd);
+ check = cmd.hasOption(OPT_CHECK);
+ load = cmd.hasOption(OPT_LOAD);
+ }
+
@Override
public int runTestFromCommandLine() throws Exception {
- runCheck();
+ if (load) {
+ runLoad();
+ } else if (check) {
+ runCheck();
+ } else {
+ testBulkLoad();
+ }
return 0;
}