You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2012/11/16 01:01:31 UTC

svn commit: r1410116 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/mapreduce/ test/java/org/apache/hadoop/hbase/mapreduce/

Author: liyin
Date: Fri Nov 16 00:01:30 2012
New Revision: 1410116

URL: http://svn.apache.org/viewvc?rev=1410116&view=rev
Log:
[HBASE-7164] Using HFileOutputFormat as MapOutputFormat

Author: liyintang

Summary:
We mainly use HFileOutputFormat as Reducer's output format. It depends on the MR framework to sort the values before written into HFiles.

However, if the mapper task can guarantee the total order for its output value, it will be more efficient to use HFileOutputFormat as MapOutputFormat.

The motivation comes from ODS use cases. The rollup job takes the TableInputFormat as MapInputFormation,  and then each mapper task rolls up some key values and emits the result value to output format.

Since all the result values are emitted in order, it would be more convenient to use HFileOutputFormat directly and bulk upload all the output files into HBase as the final step. By this approach, it would reduce the performance impact of roll up jobs into minimal.

BTW, the new unit test, TestTableInputFormatWithHFileOutputFormat, simulates the ODS rollup job's behavior.

Test Plan: Unit tests

Reviewers: kannan

Reviewed By: kannan

CC: vinodv, hbase-eng@, erling

Differential Revision: https://phabricator.fb.com/D631227

Added:
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatWithHFileOutputFormat.java
Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java?rev=1410116&r1=1410115&r2=1410116&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java Fri Nov 16 00:01:30 2012
@@ -283,6 +283,34 @@ public class HFileOutputFormat extends F
   }
 
   /**
+   * It is safe to set up  this job only if each mapper emits values for one region only 
+   * and in a sorted order.
+   * @param job
+   * @param table
+   * @throws IOException
+   */
+  public static void configAsMapOutputFormat(Job job, HTable table) throws IOException {
+    LOG.warn("Set up the HFileOutputFormat as MapperOutputFormat." +
+    		"It is the mapper task's responsibility to make sure that each mapper emits values " +
+    		"for one region only and in a sorted order. !");
+    
+    Configuration conf = job.getConfiguration();
+    if (!KeyValue.class.equals(job.getMapOutputValueClass())) {
+      LOG.error("Only support the KeyValue.class as MapOutputValueClass so far!");
+      System.exit (-1);
+    }
+
+    // Set compression algorithms based on column families
+    configureCompression(table, conf);
+    // Set BloomFilter type based on column families and
+    // relevant parameters.
+    configureBloomFilter(table, conf);
+    
+    LOG.info("Configured the HFileOutputFormat as MapperOutputFormat for table: " +
+        table.getTableDescriptor().getNameAsString());
+  }
+
+  /**
    * Configure a MapReduce Job to perform an incremental load into the given
    * table. This
    * <ul>
@@ -409,7 +437,7 @@ public class HFileOutputFormat extends F
     conf.set(COMPRESSION_CONF_KEY, compressionConfigValue.toString());
   }
 
-  private static void configureBloomFilter(HTable table, Configuration conf)
+  protected static void configureBloomFilter(HTable table, Configuration conf)
   throws IOException {
     // get conf information needed by BloomFilter
     Configuration tableConf = table.getConfiguration();

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java?rev=1410116&r1=1410115&r2=1410116&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java Fri Nov 16 00:01:30 2012
@@ -38,6 +38,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -82,6 +83,37 @@ public class TableMapReduceUtil {
   }
 
   /**
+   * Initialize a Map only job which takes TableInputFormat as MapInputFormat and
+   * HFileOutputFormat as MapOutputFormat.
+   * 
+   * It is safe to set up  this job only if each mapper emits values for one region only 
+   * and in a sorted order.
+   *
+   * @param table
+   * @param scan
+   * @param mapper
+   * @param job
+   * @throws IOException
+   */
+  public static void initHTableInputAndHFileOutputMapperJob(String table, Scan scan, 
+      Class<? extends TableMapper> mapper, Job job) throws IOException {
+    Configuration conf = job.getConfiguration();
+    job.setInputFormatClass(TableInputFormat.class);
+    
+    job.setOutputKeyClass(ImmutableBytesWritable.class);
+    job.setOutputValueClass(KeyValue.class);
+    job.setOutputFormatClass(HFileOutputFormat.class);
+    
+    HFileOutputFormat.configAsMapOutputFormat(job, new HTable(conf, table));
+    
+    job.setMapperClass(mapper);
+    job.setNumReduceTasks(0);
+    
+    job.getConfiguration().set(TableInputFormat.INPUT_TABLE, table);
+    job.getConfiguration().set(TableInputFormat.SCAN, convertScanToString(scan));
+  }
+  
+  /**
    * Writes the given scan into a Base64 encoded string.
    *
    * @param scan  The scan to write out.

Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatWithHFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatWithHFileOutputFormat.java?rev=1410116&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatWithHFileOutputFormat.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatWithHFileOutputFormat.java Fri Nov 16 00:01:30 2012
@@ -0,0 +1,163 @@
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import junit.framework.Assert;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestTableInputFormatWithHFileOutputFormat {
+  static final Log LOG = LogFactory.getLog(TestTableInputFormatScan.class);
+  static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  static final String TABLENAME = "testMultipleHLogs";
+  static final byte[] CF1 = Bytes.toBytes("cf1");
+  static final byte[] CF2 = Bytes.toBytes("cf2");
+  static final byte[][] FAMILIES = new byte[][]{CF1, CF2};
+  static final int REGION_NUM = 20;
+  static final byte[] QAULIFIER = Bytes.toBytes("q");
+  static final byte[] VALUE = Bytes.toBytes("v");
+  static final Path OUTPUTPATH = new Path("TEST-OUTPUT");
+  static HTable htable;
+  static Configuration conf = TEST_UTIL.getConfiguration();
+
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // switch TIF to log at DEBUG level
+    TEST_UTIL.enableDebug(TableInputFormat.class);
+    TEST_UTIL.enableDebug(TableInputFormatBase.class);
+    // start mini hbase cluster
+    TEST_UTIL.startMiniCluster(3);
+    // start MR cluster
+    TEST_UTIL.startMiniMapReduceCluster();
+    
+  }
+  
+  @Test
+  public void testTableInputFormatWithHFileOutputFormat() throws IOException,
+  InterruptedException, ClassNotFoundException {
+    // Create table and load some data into CF1
+    loadTable();
+    
+    // Run a MapReduce with TableInputFormat and HFileOutputFormat in order to 
+    // put all the kv from CF1 to CF2
+    launchMRJob();
+    
+    // Bulk upload the MR output file into HBase
+    new LoadIncrementalHFiles(conf).doBulkLoad(OUTPUTPATH, htable);
+    
+    // Verify the table that all the rows have the same data for both CF1 and CF2
+    verifyTable();
+  }
+  
+  private void loadTable() throws IOException{
+    final int actualStartKey = 0;
+    final int actualEndKey = Integer.MAX_VALUE;
+    final int keysPerRegion = (actualEndKey - actualStartKey) / REGION_NUM;
+    final int splitStartKey = actualStartKey + keysPerRegion;
+    final int splitEndKey = actualEndKey - keysPerRegion;
+    final String keyFormat = "%08x";
+    htable = TEST_UTIL.createTable(Bytes.toBytes(TABLENAME),
+        FAMILIES,
+        1,
+        Bytes.toBytes(String.format(keyFormat, splitStartKey)),
+        Bytes.toBytes(String.format(keyFormat, splitEndKey)),
+        REGION_NUM);
+    
+    // Put some data for each Region
+    for (byte[] row : htable.getStartKeys()) {
+      Put p = new Put(row);
+      p.add(CF1, QAULIFIER, VALUE);
+      htable.put(p);
+      htable.flushCommits();
+    }
+  }
+
+  private void launchMRJob() throws IOException, InterruptedException, ClassNotFoundException {
+    // Create the scan object
+    Scan scan = new Scan();
+    scan.addFamily(CF1);
+    
+    // Create and initialize the MR job
+    Job job = new Job(conf, "process column contents");
+    FileOutputFormat.setOutputPath(job, OUTPUTPATH);
+    
+    TableMapReduceUtil.initHTableInputAndHFileOutputMapperJob(
+        TABLENAME, 
+        scan, 
+        TestTableInputFormatWithHFileOutputFormat.DummyMapper.class, 
+        job);
+
+    // Wait for job completion
+    job.waitForCompletion(true);
+    
+    // Sanity check the output file
+    FileStatus[] files = TEST_UTIL.getDFSCluster().getFileSystem().listStatus(
+        new Path(OUTPUTPATH, Bytes.toString(CF2)));
+    for(FileStatus file : files) {
+      System.out.println(file.getPath());
+    }
+    Assert.assertEquals(REGION_NUM, files.length);
+    LOG.info("After map/reduce completion");
+  }
+  
+  private void verifyTable() throws IOException {
+    Scan scan = new Scan();
+    scan.addFamily(CF1);
+    scan.addFamily(CF2);
+    
+    ResultScanner s = htable.getScanner(scan);
+    Result result = null;
+    int count = 0;
+    while((result = s.next()) != null) {
+      count++;
+      
+      Assert.assertEquals(2, result.list().size());
+      KeyValue kvFromCF1 = result.list().get(0);
+      KeyValue kvFromCF2 = result.list().get(1);
+      
+      Assert.assertTrue(Bytes.compareTo(kvFromCF1.getFamily(), CF1) == 0);
+      Assert.assertTrue(Bytes.compareTo(kvFromCF2.getFamily(), CF2) == 0);
+      
+      Assert.assertTrue(Bytes.compareTo(kvFromCF1.getRow(), kvFromCF2.getRow()) == 0);
+      Assert.assertTrue(Bytes.compareTo(kvFromCF1.getQualifier(), kvFromCF2.getQualifier()) == 0);
+      Assert.assertTrue(kvFromCF1.getTimestamp() == kvFromCF2.getTimestamp());
+      Assert.assertTrue(Bytes.compareTo(kvFromCF1.getValue(), kvFromCF2.getValue()) == 0);
+    }
+    
+    Assert.assertEquals(REGION_NUM, count);
+  }
+  
+  public static class DummyMapper extends TableMapper<ImmutableBytesWritable, KeyValue> {
+    private KeyValue previousKV = null;
+    public void map(ImmutableBytesWritable key, Result result, Context context)
+    throws IOException, InterruptedException {
+      Assert.assertEquals(1, result.size());
+      KeyValue tmp = result.list().get(0);
+      KeyValue currentKV = new KeyValue(tmp.getRow(), CF2, tmp.getQualifier(), tmp.getTimestamp(), tmp.getValue());
+      
+      // Sanity check that the output key value is sorted
+      if (previousKV != null) {
+        Assert.assertTrue(KeyValue.COMPARATOR.compare(currentKV, previousKV) >= 0);
+      }
+      previousKV = currentKV;
+      System.out.println("current KV: " + Bytes.toStringBinary(currentKV.getBuffer()));
+      context.write(key, currentKV);
+    }
+  }
+}