You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2013/10/29 00:09:53 UTC

svn commit: r1536551 - /hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java

Author: enis
Date: Mon Oct 28 23:09:52 2013
New Revision: 1536551

URL: http://svn.apache.org/r1536551
Log:
HBASE-9759 Prevent random number collision in IntegrationTestBulkLoad

Modified:
    hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java

Modified: hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java?rev=1536551&r1=1536550&r2=1536551&view=diff
==============================================================================
--- hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java (original)
+++ hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java Mon Oct 28 23:09:52 2013
@@ -29,7 +29,10 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 
+import org.apache.commons.cli.CommandLine;
 import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
@@ -48,7 +51,7 @@ import org.apache.hadoop.hbase.util.Envi
 import org.apache.hadoop.hbase.util.RegionSplitter;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.io.WritableUtils;
@@ -61,7 +64,6 @@ import org.apache.hadoop.mapreduce.Parti
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
 import org.junit.Test;
@@ -101,6 +103,8 @@ import org.junit.experimental.categories
 @Category(IntegrationTests.class)
 public class IntegrationTestBulkLoad extends IntegrationTestBase {
 
+  private static final Log LOG = LogFactory.getLog(IntegrationTestBulkLoad.class);
+
   private static byte[] CHAIN_FAM = Bytes.toBytes("L");
   private static byte[] SORT_FAM  = Bytes.toBytes("S");
   private static byte[] DATA_FAM  = Bytes.toBytes("D");
@@ -114,18 +118,24 @@ public class IntegrationTestBulkLoad ext
   private static String NUM_IMPORT_ROUNDS_KEY = "hbase.IntegrationTestBulkLoad.numImportRounds";
   private static int NUM_IMPORT_ROUNDS = 1;
 
+  private static String ROUND_NUM_KEY = "hbase.IntegrationTestBulkLoad.roundNum";
 
   private static String TABLE_NAME_KEY = "hbase.IntegrationTestBulkLoad.tableName";
   private static String TABLE_NAME = "IntegrationTestBulkLoad";
 
   @Test
   public void testBulkLoad() throws Exception {
+    runLoad();
+    runCheck();
+  }
+
+  public void runLoad() throws Exception {
     setupTable();
     int numImportRounds = getConf().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS);
+    LOG.info("Running load with numIterations:" + numImportRounds);
     for (int i = 0; i < numImportRounds; i++) {
       runLinkedListMRJob(i);
     }
-    runCheck();
   }
 
   private byte[][] getSplits(int numRegions) {
@@ -156,13 +166,14 @@ public class IntegrationTestBulkLoad ext
 
     conf.setBoolean("mapreduce.map.speculative", false);
     conf.setBoolean("mapreduce.reduce.speculative", false);
+    conf.setInt(ROUND_NUM_KEY, iteration);
 
     Job job = new Job(conf);
 
     job.setJobName(jobName);
 
     // set the input format so that we can create map tasks with no data input.
-    job.setInputFormatClass(RandomInputFormat.class);
+    job.setInputFormatClass(ITBulkLoadInputFormat.class);
 
     // Set the mapper classes.
     job.setMapperClass(LinkedListCreationMapper.class);
@@ -194,74 +205,78 @@ public class IntegrationTestBulkLoad ext
     util.getTestFileSystem().delete(p, true);
   }
 
-  /**
-   * Class to generate splits.  Each split gets a dummy split file.  The associated
-   * RecordReader generates a single random number.
-   *
-   * This class is adapted from Hadoop tests.
-   */
-  static class RandomInputFormat extends InputFormat<Text, LongWritable> {
-    public List<InputSplit> getSplits(JobContext job) throws IOException {
-      List<InputSplit> result = new ArrayList<InputSplit>();
-      int numSplits = job.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS);
-      for (int i = 0; i < numSplits; ++i) {
-        result.add(new FileSplit(new Path("/tmp", "dummy-split-" + i), 0, 1, null));
-      }
-      return result;
-    }
-
-    /**
-     * RecordReader that doesn't read anything.  Instead it generates a single random number.
-     * This is useful for debugging or starting map tasks with no data inpput.
-     *
-     * This class is adapted from Hadoop tests.
-     */
-    static class RandomRecordReader extends RecordReader<Text, LongWritable> {
-      Path name;
-      Text key = null;
-      LongWritable value = new LongWritable();
-
-      public RandomRecordReader(Path p) {
-        name = p;
-      }
-
-      public void initialize(InputSplit split,
-                             TaskAttemptContext context)
-          throws IOException, InterruptedException {
-
-      }
-
-      public boolean nextKeyValue() {
-        if (name != null) {
-          key = new Text();
-          key.set(name.getName());
-          name = null;
-          value.set(new Random().nextLong());
-          return true;
-        }
-        return false;
-      }
-
-      public Text getCurrentKey() {
-        return key;
-      }
+  public static class EmptySplit extends InputSplit implements Writable {
+    @Override
+    public void write(DataOutput out) throws IOException { }
+    @Override
+    public void readFields(DataInput in) throws IOException { }
+    @Override
+    public long getLength() { return 0L; }
+    @Override
+    public String[] getLocations() { return new String[0]; }
+  }
 
-      public LongWritable getCurrentValue() {
-        return value;
-      }
+  public static class FixedRecordReader<K, V> extends RecordReader<K, V> {
+    private int index = -1;
+    private K[] keys;
+    private V[] values;
 
-      public void close() {
-      }
+    public FixedRecordReader(K[] keys, V[] values) {
+      this.keys = keys;
+      this.values = values;
+    }
+    @Override
+    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
+    InterruptedException { }
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      return ++index < keys.length;
+    }
+    @Override
+    public K getCurrentKey() throws IOException, InterruptedException {
+      return keys[index];
+    }
+    @Override
+    public V getCurrentValue() throws IOException, InterruptedException {
+      return values[index];
+    }
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+      return (float)index / keys.length;
+    }
+    @Override
+    public void close() throws IOException {
+    }
+  }
 
-      public float getProgress() {
-        return 0.0f;
+  public static class ITBulkLoadInputFormat extends InputFormat<LongWritable, LongWritable> {
+    @Override
+    public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
+      int numSplits = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS);
+      ArrayList<InputSplit> ret = new ArrayList<InputSplit>(numSplits);
+      for (int i = 0; i < numSplits; ++i) {
+        ret.add(new EmptySplit());
       }
+      return ret;
     }
 
-    public RecordReader<Text, LongWritable> createRecordReader(InputSplit split,
-                                                               TaskAttemptContext context)
-        throws IOException, InterruptedException {
-      return new RandomRecordReader(((FileSplit) split).getPath());
+    @Override
+    public RecordReader<LongWritable, LongWritable> createRecordReader(InputSplit split,
+      TaskAttemptContext context)
+          throws IOException, InterruptedException {
+      int taskId = context.getTaskAttemptID().getTaskID().getId();
+      int numMapTasks = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS);
+      int numIterations = context.getConfiguration().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS);
+      int iteration = context.getConfiguration().getInt(ROUND_NUM_KEY, 0);
+
+      taskId = taskId + iteration * numMapTasks;
+      numMapTasks = numMapTasks * numIterations;
+
+      long chainId = Math.abs(new Random().nextLong());
+      chainId = chainId - (chainId % numMapTasks) + taskId; // ensure that chainId is unique per task and across iterations
+      LongWritable[] keys = new LongWritable[] {new LongWritable(chainId)};
+
+      return new FixedRecordReader<LongWritable, LongWritable>(keys, keys);
     }
   }
 
@@ -273,19 +288,21 @@ public class IntegrationTestBulkLoad ext
    * All lists should be CHAIN_LENGTH long.
    */
   public static class LinkedListCreationMapper
-      extends Mapper<Text, LongWritable, ImmutableBytesWritable, KeyValue> {
+      extends Mapper<LongWritable, LongWritable, ImmutableBytesWritable, KeyValue> {
 
     private Random rand = new Random();
 
-    protected void map(Text key, LongWritable value, Context context)
+    @Override
+    protected void map(LongWritable key, LongWritable value, Context context)
         throws IOException, InterruptedException {
-
       long chainId = value.get();
+      LOG.info("Starting mapper with chainId:" + chainId);
+
       byte[] chainIdArray = Bytes.toBytes(chainId);
       long currentRow = 0;
-      long nextRow = Math.abs(rand.nextLong());
 
-      int chainLength = context.getConfiguration().getInt(CHAIN_LENGTH_KEY, CHAIN_LENGTH);
+      long chainLength = context.getConfiguration().getLong(CHAIN_LENGTH_KEY, CHAIN_LENGTH);
+      long nextRow = getNextRow(0, chainLength);
 
       for (long i = 0; i < chainLength; i++) {
         byte[] rk = Bytes.toBytes(currentRow);
@@ -305,9 +322,19 @@ public class IntegrationTestBulkLoad ext
         context.write(new ImmutableBytesWritable(rk), dataKv);
         // Move to the next row.
         currentRow = nextRow;
-        nextRow = Math.abs(rand.nextLong());
+        nextRow = getNextRow(i+1, chainLength);
       }
     }
+
+    /** Returns a unique row id within this chain for this index */
+    private long getNextRow(long index, long chainLength) {
+      long nextRow = Math.abs(rand.nextLong());
+      // use significant bits from the random number, but pad with index to ensure it is unique
+      // this also ensures that we do not reuse row = 0
+      // row collisions from multiple mappers are fine, since we guarantee unique chainIds
+      nextRow = nextRow - (nextRow % chainLength) + index;
+      return nextRow;
+    }
   }
 
   /**
@@ -427,6 +454,7 @@ public class IntegrationTestBulkLoad ext
       super(LinkKey.class, true);
     }
 
+    @Override
     public int compare(WritableComparable w1, WritableComparable w2) {
       LinkKey k1 = (LinkKey) w1;
       LinkKey k2 = (LinkKey) w2;
@@ -461,6 +489,7 @@ public class IntegrationTestBulkLoad ext
    * and value for each.
    */
   public static class LinkedListCheckingMapper extends TableMapper<LinkKey, LinkChain> {
+    @Override
     protected void map(ImmutableBytesWritable key, Result value, Context context)
         throws IOException, InterruptedException {
       long longRk = Bytes.toLong(value.getRow());
@@ -486,6 +515,7 @@ public class IntegrationTestBulkLoad ext
    */
   public static class LinkedListCheckingReducer
       extends Reducer<LinkKey, LinkChain, NullWritable, NullWritable> {
+    @Override
     protected void reduce(LinkKey key, Iterable<LinkChain> values, Context context)
         throws java.io.IOException, java.lang.InterruptedException {
       long next = -1L;
@@ -494,12 +524,13 @@ public class IntegrationTestBulkLoad ext
       for (LinkChain lc : values) {
 
         if (next == -1) {
-          if (lc.getRk() != 0L) throw new RuntimeException("Chains should all start at 0 rk");
+          if (lc.getRk() != 0L) throw new RuntimeException("Chains should all start at 0 rk"
+            + ". Chain:" + key.chainId + ", order:" + key.order);
           next = lc.getNext();
         } else {
           if (next != lc.getRk())
-            throw new RuntimeException("Missing a link in the chain. Expecthing " +
-                next + " got " + lc.getRk());
+            throw new RuntimeException("Missing a link in the chain. Expecting " +
+                next + " got " + lc.getRk() + ". Chain:" + key.chainId + ", order:" + key.order);
           next = lc.getNext();
         }
         count++;
@@ -508,7 +539,7 @@ public class IntegrationTestBulkLoad ext
       int expectedChainLen = context.getConfiguration().getInt(CHAIN_LENGTH_KEY, CHAIN_LENGTH);
       if (count != expectedChainLen)
         throw new RuntimeException("Chain wasn't the correct length.  Expected " +
-            expectedChainLen + " got " + count);
+            expectedChainLen + " got " + count + ". Chain:" + key.chainId + ", order:" + key.order);
     }
   }
 
@@ -519,6 +550,7 @@ public class IntegrationTestBulkLoad ext
    * @throws InterruptedException
    */
   private void runCheck() throws IOException, ClassNotFoundException, InterruptedException {
+    LOG.info("Running check");
     Configuration conf = getConf();
     String jobName = getTablename() + "_check" + EnvironmentEdgeManager.currentTimeMillis();
     Path p = util.getDataTestDirOnTestFS(jobName);
@@ -536,7 +568,7 @@ public class IntegrationTestBulkLoad ext
     s.addFamily(SORT_FAM);
     s.setMaxVersions(1);
     s.setCacheBlocks(false);
-    s.setBatch(100);
+    s.setBatch(1000);
 
     TableMapReduceUtil.initTableMapperJob(
         Bytes.toBytes(getTablename()),
@@ -575,9 +607,35 @@ public class IntegrationTestBulkLoad ext
     }
   }
 
+  private static final String OPT_LOAD = "load";
+  private static final String OPT_CHECK = "check";
+
+  private boolean load = false;
+  private boolean check = false;
+
+  @Override
+  protected void addOptions() {
+    super.addOptions();
+    super.addOptNoArg(OPT_CHECK, "Run check only");
+    super.addOptNoArg(OPT_LOAD, "Run load only");
+  }
+
+  @Override
+  protected void processOptions(CommandLine cmd) {
+    super.processOptions(cmd);
+    check = cmd.hasOption(OPT_CHECK);
+    load = cmd.hasOption(OPT_LOAD);
+  }
+
   @Override
   public int runTestFromCommandLine() throws Exception {
-    runCheck();
+    if (load) {
+      runLoad();
+    } else if (check) {
+      runCheck();
+    } else {
+      testBulkLoad();
+    }
     return 0;
   }