You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by dm...@apache.org on 2011/08/31 22:02:06 UTC

svn commit: r1163781 - /hbase/trunk/src/docbkx/book.xml

Author: dmeil
Date: Wed Aug 31 20:02:06 2011
New Revision: 1163781

URL: http://svn.apache.org/viewvc?rev=1163781&view=rev
Log:
HBASE-4316 book.xml - overhauled mapreduce examples

Modified:
    hbase/trunk/src/docbkx/book.xml

Modified: hbase/trunk/src/docbkx/book.xml
URL: http://svn.apache.org/viewvc/hbase/trunk/src/docbkx/book.xml?rev=1163781&r1=1163780&r2=1163781&view=diff
==============================================================================
--- hbase/trunk/src/docbkx/book.xml (original)
+++ hbase/trunk/src/docbkx/book.xml Wed Aug 31 20:02:06 2011
@@ -81,38 +81,199 @@
   <para>See <link xlink:href="http://hbase.org/apidocs/org/apache/hadoop/hbase/mapreduce/package-summary.html#package_description">HBase and MapReduce</link> up in javadocs.
   Start there.  Below is some additional help.</para>
   <section xml:id="splitter">
-  <title>The default HBase MapReduce Splitter</title>
-  <para>When <link xlink:href="http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableInputFormat.html">TableInputFormat</link>,
-  is used to source an HBase table in a MapReduce job,
-  its splitter will make a map task for each region of the table.
-  Thus, if there are 100 regions in the table, there will be
-  100 map-tasks for the job - regardless of how many column families are selected in the Scan.</para>
+    <title>Map-Task Spitting</title>
+    <section xml:id="splitter.default">
+    <title>The Default HBase MapReduce Splitter</title>
+    <para>When <link xlink:href="http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableInputFormat.html">TableInputFormat</link>
+    is used to source an HBase table in a MapReduce job,
+    its splitter will make a map task for each region of the table.
+    Thus, if there are 100 regions in the table, there will be
+    100 map-tasks for the job - regardless of how many column families are selected in the Scan.</para>
+    </section>
+    <section xml:id="splitter.custom">
+    <title>Custom Splitters</title>
+    <para>For those interested in implementing custom splitters, see the method <code>getSplits</code> in 
+    <link xlink:href="http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.html">TableInputFormatBase</link>.
+    That is where the logic for map-task assignment resides.  
+    </para>
+    </section>
   </section>
   <section xml:id="mapreduce.example">
-  <title>HBase Input MapReduce Example</title>
-  <para>To use HBase as a MapReduce source,
-  the job would be configured via <link xlink:href="http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.html">TableMapReduceUtil</link> in the following manner...
-	<programlisting>Job job = ...;	
+    <title>HBase MapReduce Examples</title>
+    <section xml:id="mapreduce.example.read">
+    <title>HBase MapReduce Read Example</title>
+    <para>The following is an example of using HBase as a MapReduce source in read-only manner.  Specifically,
+    there is a Mapper instance but no Reducer, and nothing is being emitted from the Mapper.  There job would be defined
+    as follows...
+	<programlisting>
+Configuration config = HBaseConfiguration.create();
+Job job = new Job(config, "ExampleRead");
+job.setJarByClass(MyReadJob.class);     // class that contains mapper
+	
 Scan scan = new Scan();
-scan.setCaching(500);  // 1 is the default in Scan, which will be bad for MapReduce jobs
-scan.setCacheBlocks(false);  
-// Now set other scan attrs
+scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
+scan.setCacheBlocks(false);  // don't set to true for MR jobs
+// set other scan attrs
 ...
   
 TableMapReduceUtil.initTableMapperJob(
-  tableName,   		// input HBase table name
-  scan, 			// Scan instance to control CF and attribute selection
-  MyMapper.class,		// mapper
-  Text.class,		// reducer key 
-  LongWritable.class,	// reducer value
-  job			// job instance
-  );</programlisting>
+  tableName,        // input HBase table name
+  scan,             // Scan instance to control CF and attribute selection
+  MyMapper.class,   // mapper
+  null,             // mapper output key 
+  null,             // mapper output value
+  job);
+job.setOutputFormatClass(NullOutputFormat.class);   // because we aren't emitting anything from mapper
+	    
+boolean b = job.waitForCompletion(true);
+if ( b == false) {
+  throw new IOException("error with job!");
+}
+  </programlisting>
   ...and the mapper instance would extend <link xlink:href="http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableMapper.html">TableMapper</link>...
 	<programlisting>
-public class MyMapper extends TableMapper&lt;Text, LongWritable&gt; {
+public static class MyMapper extends TableMapper&lt;Text, Text&gt; {
+
   public void map(ImmutableBytesWritable row, Result value, Context context) throws InterruptedException, IOException {
-  // process data for the row from the Result instance.</programlisting>
-  	</para>
+    // process data for the row from the Result instance.
+   }
+}    
+    </programlisting>
+  	  </para>
+  	 </section>
+    <section xml:id="mapreduce.example.readwrite">
+    <title>HBase MapReduce Read/Write Example</title>
+    <para>The following is an example of using HBase both as a source and as a sink with MapReduce. 
+    This example will simply copy data from one table to another.
+    <programlisting>
+Configuration config = HBaseConfiguration.create();
+Job job = new Job(config,"ExampleReadWrite");
+job.setJarByClass(MyReadWriteJob.class);    // class that contains mapper
+	        	        
+Scan scan = new Scan();
+scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
+scan.setCacheBlocks(false);  // don't set to true for MR jobs
+// set other scan attrs
+	        
+TableMapReduceUtil.initTableMapperJob(
+	sourceTable,      // input table
+	scan,	          // Scan instance to control CF and attribute selection
+	MyMapper.class,   // mapper class
+	null,	          // mapper output key
+	null,	          // mapper output value
+	job);
+TableMapReduceUtil.initTableReducerJob(
+	targetTable,      // output table
+	null,             // reducer class
+	job);
+job.setNumReduceTasks(0);
+	        
+boolean b = job.waitForCompletion(true);
+if ( b == false) {
+	throw new IOException("error with job!");
+}
+    </programlisting>
+	An explanation is required of what <classname>TableMapReduceUtil</classname> is doing, especially with the reducer.  
+	<link xlink:href="http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.html">TableOutputFormat</link> is being used
+	as the outputFormat class, and several parameters are being set on the config (e.g., TableOutputFormat.OUTPUT_TABLE), as
+	well as setting the reducer output keys to <classname>ImmutableBytesWritable</classname> and <classname>Writable</classname>.
+	These could be set by the programmer on the job and conf, but <classname>TableMapReduceUtil</classname> tries to make things easier.    
+	<para>The following is the example mapper, which will create a <classname>Put</classname> and matching the input <classname>Result</classname>
+	and emit it.  Note:  this is what the CopyTable utility does.
+	</para>
+    <programlisting>
+public static class MyMapper extends TableMapper&lt;ImmutableBytesWritable, Put&gt;  {
+
+	public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
+		// this example is just copying the data from the source table...
+   		context.write(row, resultToPut(row,value));
+   	}
+        
+  	private static Put resultToPut(ImmutableBytesWritable key, Result result)
+        throws IOException {
+   		Put put = new Put(key.get());
+       	for (KeyValue kv : result.raw()) {
+       		put.add(kv);
+       	}
+       	return put;
+   	}
+}
+    </programlisting>
+    <para>There isn't actually a reducer step, so <classname>TableOutputFormat</classname> takes care of sending the <classname>Put</classname>
+    to the target table. 
+    </para>
+    <para>This is just an example, developers could choose not to use <classname>TableOutputFormat</classname> and connect to the 
+    target table themselves.
+    </para>
+    </para>
+    </section>
+    <section xml:id="mapreduce.example.summary">
+    <title>HBase MapReduce Summary Example</title>
+    <para>The following example uses HBase as a MapReduce source and sink with a summarization step.  This example will 
+    count the number of distinct instances of a value in a table and write those summarized counts in another table.
+    <programlisting>
+Configuration config = HBaseConfiguration.create();
+Job job = new Job(config,"ExampleSummary");
+job.setJarByClass(MySummaryJob.class);     // class that contains mapper and reducer
+	        
+Scan scan = new Scan();
+scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
+scan.setCacheBlocks(false);  // don't set to true for MR jobs
+// set other scan attrs
+	        
+TableMapReduceUtil.initTableMapperJob(
+	sourceTable,        // input table
+	scan,               // Scan instance to control CF and attribute selection
+	MyMapper.class,     // mapper class
+	Text.class,         // mapper output key
+	IntWritable.class,  // mapper output value
+	job);
+TableMapReduceUtil.initTableReducerJob(
+	targetTable,        // output table
+	MyReducer.class,    // reducer class
+	job);
+job.setNumReduceTasks(1);   // at least one, adjust as required
+	    
+boolean b = job.waitForCompletion(true);
+if ( b == false) {
+	throw new IOException("error with job!");
+}    
+    </programlisting>
+    In this example mapper a column with a String-value is chosen as the value to summarize upon.  
+    This value is used as the key to emit from the mapper, and an <classname>IntWritable</classname> represents an instance counter.
+    <programlisting>
+public static class MyMapper extends TableMapper&lt;Text, IntWritable&gt;  {
+
+	private final IntWritable ONE = new IntWritable(1);
+   	private Text text = new Text();
+    	
+   	public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
+        	String val = new String(value.getValue(Bytes.toBytes("cf"), Bytes.toBytes("attr1")));
+          	text.set(val);     // we can only emit Writables...
+
+        	context.write(text, ONE);
+   	}
+}
+    </programlisting>
+    In the reducer, the "ones" are counted (just like any other MR example that does this), and then emits a <classname>Put</classname>.
+    <programlisting>
+public static class MyReducer extends TableReducer&lt;Text, IntWritable, ImmutableBytesWritable&gt;  {
+        
+ 	public void reduce(Text key, Iterable&lt;IntWritable&gt; values, Context context) throws IOException, InterruptedException {
+    		int i = 0;
+    		for (IntWritable val : values) {
+    			i += val.get();
+    		}
+    		Put put = new Put(Bytes.toBytes(key.toString()));
+    		put.add(Bytes.toBytes("cf"), Bytes.toBytes("count"), Bytes.toBytes(i));
+
+    		context.write(null, put);
+   	}
+}
+    </programlisting>
+    
+    </para>
+    </section>
    </section>
    <section xml:id="mapreduce.htable.access">
    <title>Accessing Other HBase Tables in a MapReduce Job</title>
@@ -123,10 +284,16 @@ public class MyMapper extends TableMappe
 	<programlisting>public class MyMapper extends TableMapper&lt;Text, LongWritable&gt; {
   private HTable myOtherTable;
 
-  @Override
   public void setup(Context context) {
     myOtherTable = new HTable("myOtherTable");
-  }</programlisting>
+  }
+  
+  public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
+	// process Result...
+	// use 'myOtherTable' for lookups
+  }
+  
+  </programlisting>
    </para>
     </section>
   <section xml:id="mapreduce.specex">
@@ -381,7 +548,7 @@ admin.enableTable(table);               
       </title>
       <para>A secondary index could be created in an other table which is periodically updated via a MapReduce job.  The job could be executed intra-day, but depending on 
       load-strategy it could still potentially be out of sync with the main data table.</para>
-      <para>See <xref linkend="mapreduce"/> for more information.</para>
+      <para>See <xref linkend="mapreduce.example.readwrite"/> for more information.</para>
     </section>
     <section xml:id="secondary.indexes.dualwrite">
       <title>
@@ -396,7 +563,7 @@ admin.enableTable(table);               
       </title>
       <para>Where time-ranges are very wide (e.g., year-long report) and where the data is voluminous, summary tables are a common approach.
       These would be generated with MapReduce jobs into another table.</para>
-      <para>See <xref linkend="mapreduce"/> for more information.</para>
+      <para>See <xref linkend="mapreduce.example.summary"/> for more information.</para>
     </section>
     <section xml:id="secondary.indexes.coproc">
       <title>