You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@hbase.apache.org by stack <st...@duboce.net> on 2008/07/01 19:38:14 UTC

Re: Map Reduce over HBase - sample code

Comments in-line below:

Naama Kraus wrote:
> Here is an updated code
>
> Naama
>
> /**
>  * <pre>
>  * 'Toy tables' for experiencing with MapReduce over HBase
>   
Do you mean 'experimenting' in the above?

....
>   public void create() throws IOException {
>   

Where does this method get called?  I don't see it.


>     System.out.println("Grades Table populated");
>   

Do you want to set up a logger to do the outputting instead?  See the 
head of (most) hbase classes for example.  Look for 'LOG'.


>   }
> }
>
>
> ====================================================
>
> /**
>  * A map reduce job over {@link GradesTable}
>  * The job produces for each course the average grade in that course.
>  * It puts the average in a separate table which holds course statistics.
>  *
>  */
> public class GradesTableMapReduce  extends Configured implements Tool {
>
>   /**
>    * Map a row to {key, value} pairs.
>    * Emit a {course, grade} pair for each course grade appearing in the
> student row.
>    * E.g. Sara {Math:62, Art:45, Sports:87} -> {Math, 62}, {Art, 45},
> {Sports, 87}
>    *
>    */
>   public static class GradesTableMap extends TableMap<Text, IntWritable> {
>
>     @Override
>     public void map(HStoreKey key, MapWritable value,
>         OutputCollector<Text, IntWritable> output, Reporter reporter) throws
> IOException {
>
>       // Walk through the columns
>       for (Map.Entry<Writable, Writable> e: value.entrySet()) {
>         // Column name is course name
>         Text course = (Text) e.getKey();
>         // Remove the family prefix
>         String courseStr = course.toString();
>         courseStr =
>           courseStr.substring(courseStr.indexOf(':') + 1);
>   

There may be utility in HStoreKey to do the above stripping of the 
column family (getQualifier?).

>         course = new Text(courseStr);
>         byte [] gradeInBytes = ((ImmutableBytesWritable)
> e.getValue()).get();
>         DataInputStream in = new DataInputStream(new
> ByteArrayInputStream(gradeInBytes));
>         IntWritable grade = new IntWritable();
>         grade.readFields(in);
>   

You could have used Writables.getWritable above and saved yourself a few 
lines (Not important).

Otherwise, this class is an excellent example of using MR + HBase.  I've 
add a pointer to it up on the wiki under the MR+HBase page (update the 
link if you update your code).

Thanks,
St.Ack

Re: Map Reduce over HBase - sample code

Posted by stack <st...@duboce.net>.
Thank you Naama.  I updated link on wiki to point to this latest version.
St.Ack

Naama Kraus wrote:
> Hi,
>
> Here is a version following the last comments.
>
> Naama
>
> /**
>  * <pre>
>  * 'Toy tables' for experimenting with MapReduce over HBase
>  *
>  * grades table - a HBase table of the form -
>  * raw id is a student name
>  * column name is Course:course_name
>  * cell value is the student's grade in the course 'course_name'
>  *
>  * Exmaple:
>  *
>  *         Course:Math  |  Course:Art  |  Course:Sports
>  *         ----------------------------------------------
>  * Dan        87                97              99
>  * Dana       100               100             80
>  *
>  * =======================================
>  *
>  * courses table - a HBase table of the form -
>  * raw id is a course name
>  * column name is Stats:Average
>  * cell value is the average grade in that course, computed by a map reduce
> job
>  *
>  * Exmaple:
>  *
>  *            Stats:Average
>  *            --------------
>  *  Art          86
>  *  Match        77
>  * </pre>
>  * @see GradesTableMapReduce
>  *
>  *
>  */
> public class GradesTable {
>
>   public static final String GRADES_TABLE_NAME = "grades";
>   public static final String COURSE_TABLE_NAME = "courses";
>   public static final String COURSE_FAMILY = "Course:";
>   // A column family holding grades statistics
>   public static final String STATS_FAMILY = "Stats:";
>   // A column member holding average grade in course
>   public static final String AVG = "Average";
>
>   private static final String [] STUDENT_NAMES = {
>     "Dan", "Dana", "Sara", "David"
>   };
>
>   private static final String [] COURSE_NAMES = {
>     "Math", "Art", "Sports"
>   };
>
>   private HBaseConfiguration conf;
>   private HBaseAdmin admin;
>   private HTableDescriptor grades_desc;
>   private HTableDescriptor courses_desc;
>   // Randomly generate a grade
>   private Random rand;
>
>   private static final Log LOG =
> LogFactory.getLog(GradesTable.class.getName());
>
>   public GradesTable() throws IOException {
>     conf = new HBaseConfiguration();
>     admin = new HBaseAdmin(conf);
>     grades_desc = new HTableDescriptor(GRADES_TABLE_NAME);
>     courses_desc = new HTableDescriptor(COURSE_TABLE_NAME);
>     rand = new Random();
>   }
>
>   /**
>    * Create tables and populate with content
>    */
>   public void create() throws IOException {
>     grades_desc.addFamily(new HColumnDescriptor(COURSE_FAMILY));
>     courses_desc.addFamily(new HColumnDescriptor(STATS_FAMILY));
>     admin.createTable(grades_desc);
>     admin.createTable(courses_desc);
>     LOG.info("Tables created");
>
>     // Populate grades table with students and their grades in courses
>     HTable table = new HTable(conf, new Text(GRADES_TABLE_NAME));
>
>     // Start an update transaction, student name is row id
>     for (int i = 0; i < STUDENT_NAMES.length; i++) {
>       LOG.info("<<< Row " + i + ", student: " + STUDENT_NAMES[i] + " >>>");
>       Text stuName = new Text(STUDENT_NAMES[i]);
>       long writeid = table.startUpdate(stuName);
>       for (int j = 0; j < COURSE_NAMES.length; j++) {
>         Text courseColumn = new Text(COURSE_FAMILY + COURSE_NAMES[j]);
>         // Put a cell with a student's grade in this course
>         int grade = Math.abs(rand.nextInt()) % 101;
>         table.put(writeid, courseColumn, new IntWritable(grade));
>         LOG.info("Course: " + COURSE_NAMES[j] + ", grade: " + grade);
>       }
>       table.commit(writeid);
>     }
>     LOG.info("Grades Table populated");
>   }
>
>   public static void main(String [] args) {
>     try {
>       GradesTable gradesTable = new GradesTable();
>       gradesTable.create();
>     } catch (IOException e) {
>       LOG.fatal("An exception occured", e);
>     }
> }
>
> =========================================================
>
> /**
>  * A map reduce job over {@link GradesTable}
>  * The job produces for each course the average grade in that course.
>  * It puts the average in a separate table which holds course statistics.
>  *
>  */
> public class GradesTableMapReduce  extends Configured implements Tool {
>
>   /**
>    * Map a row to {key, value} pairs.
>    * Emit a {course, grade} pair for each course grade appearing in the
> student row.
>    * E.g. Sara {Math:62, Art:45, Sports:87} -> {Math, 62}, {Art, 45},
> {Sports, 87}
>    *
>    */
>   public static class GradesTableMap extends TableMap<Text, IntWritable> {
>
>     @Override
>     public void map(HStoreKey key, MapWritable value,
>         OutputCollector<Text, IntWritable> output, Reporter reporter) throws
> IOException {
>
>       // Walk through the columns
>       for (Map.Entry<Writable, Writable> e: value.entrySet()) {
>         // Column name is course name
>         Text course = (Text) e.getKey();
>         // Remove the family prefix
>         String courseStr = HStoreKey.extractQualifier(course).toString();
>         course = new Text(courseStr);
>         byte [] gradeInBytes = ((ImmutableBytesWritable)
> e.getValue()).get();
>         IntWritable grade = new IntWritable();
>         Writables.getWritable(gradeInBytes, grade);
>
>         // Emit course name and a grade
>         output.collect(course, grade);
>       }
>     }
>   }
>
>   /**
>    * Reduce - compute an average of key's values which is actually the
> average grade in each course.
>    * E.g. {Math, {62, 45, 87}} -> {Math, 65.6}
>    *
>    */
>   public static class GradesTableReduce extends TableReduce<Text,
> IntWritable> {
>
>     @Override
>     // key is course name, values are grades in the course
>     public void reduce(Text key, Iterator<IntWritable> values,
>         OutputCollector<Text, MapWritable> output, Reporter reporter)
>     throws IOException {
>       // Compute grades average
>       int total = 0;
>       int sum = 0;
>       while (values.hasNext()) {
>         total++;
>         sum += values.next().get();
>       }
>       float average = sum / total;
>
>       // We put the average as a separate column in the courses table
>       ByteArrayOutputStream baos = new ByteArrayOutputStream();
>       DataOutputStream out = new DataOutputStream(baos);
>       FloatWritable avgWritable = new FloatWritable(average);
>       avgWritable.write(out);
>       MapWritable map = new MapWritable();
>       map.put(new Text(GradesTable.STATS_FAMILY + GradesTable.AVG),
>               new ImmutableBytesWritable(baos.toByteArray()));
>       output.collect(key, map);
>     }
>   }
>
>   /**
>    * Run
>    */
>   public int run(String[] args) throws Exception {
>     JobConf jobConf = new JobConf();
>     jobConf.setJobName("compute average grades");
>     jobConf.setNumReduceTasks(1);
>
>     // All columns in the course family (i.e. all grades) get into the map
>     TableMap.initJob(GradesTable.GRADES_TABLE_NAME,
> GradesTable.COURSE_FAMILY,
>         GradesTableMap.class, jobConf);
>
>     // Reduce output (course average grade) is put in the courses table
>     TableReduce.initJob(GradesTable.COURSE_TABLE_NAME,
>         GradesTableReduce.class, jobConf);
>
>     // Map produces a value which is an IntWritable
>     jobConf.setMapOutputValueClass(IntWritable.class);
>
>     JobClient.runJob(jobConf);
>     return 0;
>   }
>
>   public static void main(String [] args) throws Exception {
>     ToolRunner.run(new Configuration(), new GradesTableMapReduce(), args);
>   }
> }
>
> On Thu, Jul 3, 2008 at 9:44 AM, Naama Kraus <na...@gmail.com> wrote:
>
>   
>> Thanks St.Ack for the further comments and for putting a link in the Wiki.
>> Naama
>>
>>
>> On Tue, Jul 1, 2008 at 8:38 PM, stack <st...@duboce.net> wrote:
>>
>>     
>>> Comments in-line below:
>>>
>>> Naama Kraus wrote:
>>>
>>>       
>>>> Here is an updated code
>>>>
>>>> Naama
>>>>
>>>> /**
>>>>  * <pre>
>>>>  * 'Toy tables' for experiencing with MapReduce over HBase
>>>>
>>>>
>>>>         
>>> Do you mean 'experimenting' in the above?
>>>
>>> ....
>>>
>>>       
>>>>  public void create() throws IOException {
>>>>
>>>>
>>>>         
>>> Where does this method get called?  I don't see it.
>>>
>>>
>>>     System.out.println("Grades Table populated");
>>>       
>>>>         
>>> Do you want to set up a logger to do the outputting instead?  See the head
>>> of (most) hbase classes for example.  Look for 'LOG'.
>>>
>>>
>>>   }
>>>       
>>>> }
>>>>
>>>>
>>>> ====================================================
>>>>
>>>> /**
>>>>  * A map reduce job over {@link GradesTable}
>>>>  * The job produces for each course the average grade in that course.
>>>>  * It puts the average in a separate table which holds course statistics.
>>>>  *
>>>>  */
>>>> public class GradesTableMapReduce  extends Configured implements Tool {
>>>>
>>>>  /**
>>>>   * Map a row to {key, value} pairs.
>>>>   * Emit a {course, grade} pair for each course grade appearing in the
>>>> student row.
>>>>   * E.g. Sara {Math:62, Art:45, Sports:87} -> {Math, 62}, {Art, 45},
>>>> {Sports, 87}
>>>>   *
>>>>   */
>>>>  public static class GradesTableMap extends TableMap<Text, IntWritable> {
>>>>
>>>>    @Override
>>>>    public void map(HStoreKey key, MapWritable value,
>>>>        OutputCollector<Text, IntWritable> output, Reporter reporter)
>>>> throws
>>>> IOException {
>>>>
>>>>      // Walk through the columns
>>>>      for (Map.Entry<Writable, Writable> e: value.entrySet()) {
>>>>        // Column name is course name
>>>>        Text course = (Text) e.getKey();
>>>>        // Remove the family prefix
>>>>        String courseStr = course.toString();
>>>>        courseStr =
>>>>          courseStr.substring(courseStr.indexOf(':') + 1);
>>>>
>>>>
>>>>         
>>> There may be utility in HStoreKey to do the above stripping of the column
>>> family (getQualifier?).
>>>
>>>         course = new Text(courseStr);
>>>       
>>>>        byte [] gradeInBytes = ((ImmutableBytesWritable)
>>>> e.getValue()).get();
>>>>        DataInputStream in = new DataInputStream(new
>>>> ByteArrayInputStream(gradeInBytes));
>>>>        IntWritable grade = new IntWritable();
>>>>        grade.readFields(in);
>>>>
>>>>
>>>>         
>>> You could have used Writables.getWritable above and saved yourself a few
>>> lines (Not important).
>>>
>>> Otherwise, this class is an excellent example of using MR + HBase.  I've
>>> add a pointer to it up on the wiki under the MR+HBase page (update the link
>>> if you update your code).
>>>
>>> Thanks,
>>> St.Ack
>>>
>>>       
>>
>> --
>> oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo
>> 00 oo 00 oo
>> "If you want your children to be intelligent, read them fairy tales. If you
>> want them to be more intelligent, read them more fairy tales." (Albert
>> Einstein)
>>
>>     
>
>
>
>   


Re: Map Reduce over HBase - sample code

Posted by David B <da...@gmail.com>.
Hi All,
I've recently wanted to move from running map reduce over files to
processing tables.
The example code here was very useful, but it required some massaging to
make it compile with version 0.19.3 of hbase and 0.19.1 of hadoop core.

After fixing and checking the code worked, I figured it would be helpful for
future reference to post it back.
I have attached the code as two files - GradesTable.java and
GradesTableMapReduce.java.

The UI for uploading files was odd, so I may re-post if it appears to have
failed. - Preview looks good though.

Best Regards,
Dave.

http://www.nabble.com/file/p25107305/GradesTable.java GradesTable.java 
http://www.nabble.com/file/p25107305/GradesTableMapReduce.java
GradesTableMapReduce.java 


Naama Kraus wrote:
> 
> Hi,
> 
> Here is a version following the last comments.
> 
> Naama
> 
> /**
>  * <pre>
>  * 'Toy tables' for experimenting with MapReduce over HBase
>  *
>  * grades table - a HBase table of the form -
>  * raw id is a student name
>  * column name is Course:course_name
>  * cell value is the student's grade in the course 'course_name'
>  *
>  * Exmaple:
>  *
>  *         Course:Math  |  Course:Art  |  Course:Sports
>  *         ----------------------------------------------
>  * Dan        87                97              99
>  * Dana       100               100             80
>  *
>  * =======================================
>  *
>  * courses table - a HBase table of the form -
>  * raw id is a course name
>  * column name is Stats:Average
>  * cell value is the average grade in that course, computed by a map
> reduce
> job
>  *
>  * Exmaple:
>  *
>  *            Stats:Average
>  *            --------------
>  *  Art          86
>  *  Match        77
>  * </pre>
>  * @see GradesTableMapReduce
>  *
>  *
>  */
> public class GradesTable {
> 
>   public static final String GRADES_TABLE_NAME = "grades";
>   public static final String COURSE_TABLE_NAME = "courses";
>   public static final String COURSE_FAMILY = "Course:";
>   // A column family holding grades statistics
>   public static final String STATS_FAMILY = "Stats:";
>   // A column member holding average grade in course
>   public static final String AVG = "Average";
> 
>   private static final String [] STUDENT_NAMES = {
>     "Dan", "Dana", "Sara", "David"
>   };
> 
>   private static final String [] COURSE_NAMES = {
>     "Math", "Art", "Sports"
>   };
> 
>   private HBaseConfiguration conf;
>   private HBaseAdmin admin;
>   private HTableDescriptor grades_desc;
>   private HTableDescriptor courses_desc;
>   // Randomly generate a grade
>   private Random rand;
> 
>   private static final Log LOG =
> LogFactory.getLog(GradesTable.class.getName());
> 
>   public GradesTable() throws IOException {
>     conf = new HBaseConfiguration();
>     admin = new HBaseAdmin(conf);
>     grades_desc = new HTableDescriptor(GRADES_TABLE_NAME);
>     courses_desc = new HTableDescriptor(COURSE_TABLE_NAME);
>     rand = new Random();
>   }
> 
>   /**
>    * Create tables and populate with content
>    */
>   public void create() throws IOException {
>     grades_desc.addFamily(new HColumnDescriptor(COURSE_FAMILY));
>     courses_desc.addFamily(new HColumnDescriptor(STATS_FAMILY));
>     admin.createTable(grades_desc);
>     admin.createTable(courses_desc);
>     LOG.info("Tables created");
> 
>     // Populate grades table with students and their grades in courses
>     HTable table = new HTable(conf, new Text(GRADES_TABLE_NAME));
> 
>     // Start an update transaction, student name is row id
>     for (int i = 0; i < STUDENT_NAMES.length; i++) {
>       LOG.info("<<< Row " + i + ", student: " + STUDENT_NAMES[i] + "
> >>>");
>       Text stuName = new Text(STUDENT_NAMES[i]);
>       long writeid = table.startUpdate(stuName);
>       for (int j = 0; j < COURSE_NAMES.length; j++) {
>         Text courseColumn = new Text(COURSE_FAMILY + COURSE_NAMES[j]);
>         // Put a cell with a student's grade in this course
>         int grade = Math.abs(rand.nextInt()) % 101;
>         table.put(writeid, courseColumn, new IntWritable(grade));
>         LOG.info("Course: " + COURSE_NAMES[j] + ", grade: " + grade);
>       }
>       table.commit(writeid);
>     }
>     LOG.info("Grades Table populated");
>   }
> 
>   public static void main(String [] args) {
>     try {
>       GradesTable gradesTable = new GradesTable();
>       gradesTable.create();
>     } catch (IOException e) {
>       LOG.fatal("An exception occured", e);
>     }
> }
> 
> =========================================================
> 
> /**
>  * A map reduce job over {@link GradesTable}
>  * The job produces for each course the average grade in that course.
>  * It puts the average in a separate table which holds course statistics.
>  *
>  */
> public class GradesTableMapReduce  extends Configured implements Tool {
> 
>   /**
>    * Map a row to {key, value} pairs.
>    * Emit a {course, grade} pair for each course grade appearing in the
> student row.
>    * E.g. Sara {Math:62, Art:45, Sports:87} -> {Math, 62}, {Art, 45},
> {Sports, 87}
>    *
>    */
>   public static class GradesTableMap extends TableMap<Text, IntWritable> {
> 
>     @Override
>     public void map(HStoreKey key, MapWritable value,
>         OutputCollector<Text, IntWritable> output, Reporter reporter)
> throws
> IOException {
> 
>       // Walk through the columns
>       for (Map.Entry<Writable, Writable> e: value.entrySet()) {
>         // Column name is course name
>         Text course = (Text) e.getKey();
>         // Remove the family prefix
>         String courseStr = HStoreKey.extractQualifier(course).toString();
>         course = new Text(courseStr);
>         byte [] gradeInBytes = ((ImmutableBytesWritable)
> e.getValue()).get();
>         IntWritable grade = new IntWritable();
>         Writables.getWritable(gradeInBytes, grade);
> 
>         // Emit course name and a grade
>         output.collect(course, grade);
>       }
>     }
>   }
> 
>   /**
>    * Reduce - compute an average of key's values which is actually the
> average grade in each course.
>    * E.g. {Math, {62, 45, 87}} -> {Math, 65.6}
>    *
>    */
>   public static class GradesTableReduce extends TableReduce<Text,
> IntWritable> {
> 
>     @Override
>     // key is course name, values are grades in the course
>     public void reduce(Text key, Iterator<IntWritable> values,
>         OutputCollector<Text, MapWritable> output, Reporter reporter)
>     throws IOException {
>       // Compute grades average
>       int total = 0;
>       int sum = 0;
>       while (values.hasNext()) {
>         total++;
>         sum += values.next().get();
>       }
>       float average = sum / total;
> 
>       // We put the average as a separate column in the courses table
>       ByteArrayOutputStream baos = new ByteArrayOutputStream();
>       DataOutputStream out = new DataOutputStream(baos);
>       FloatWritable avgWritable = new FloatWritable(average);
>       avgWritable.write(out);
>       MapWritable map = new MapWritable();
>       map.put(new Text(GradesTable.STATS_FAMILY + GradesTable.AVG),
>               new ImmutableBytesWritable(baos.toByteArray()));
>       output.collect(key, map);
>     }
>   }
> 
>   /**
>    * Run
>    */
>   public int run(String[] args) throws Exception {
>     JobConf jobConf = new JobConf();
>     jobConf.setJobName("compute average grades");
>     jobConf.setNumReduceTasks(1);
> 
>     // All columns in the course family (i.e. all grades) get into the map
>     TableMap.initJob(GradesTable.GRADES_TABLE_NAME,
> GradesTable.COURSE_FAMILY,
>         GradesTableMap.class, jobConf);
> 
>     // Reduce output (course average grade) is put in the courses table
>     TableReduce.initJob(GradesTable.COURSE_TABLE_NAME,
>         GradesTableReduce.class, jobConf);
> 
>     // Map produces a value which is an IntWritable
>     jobConf.setMapOutputValueClass(IntWritable.class);
> 
>     JobClient.runJob(jobConf);
>     return 0;
>   }
> 
>   public static void main(String [] args) throws Exception {
>     ToolRunner.run(new Configuration(), new GradesTableMapReduce(), args);
>   }
> }
> 
> On Thu, Jul 3, 2008 at 9:44 AM, Naama Kraus <na...@gmail.com> wrote:
> 
>> Thanks St.Ack for the further comments and for putting a link in the
>> Wiki.
>> Naama
>>
>>
>> On Tue, Jul 1, 2008 at 8:38 PM, stack <st...@duboce.net> wrote:
>>
>>> Comments in-line below:
>>>
>>> Naama Kraus wrote:
>>>
>>>> Here is an updated code
>>>>
>>>> Naama
>>>>
>>>> /**
>>>>  * <pre>
>>>>  * 'Toy tables' for experiencing with MapReduce over HBase
>>>>
>>>>
>>> Do you mean 'experimenting' in the above?
>>>
>>> ....
>>>
>>>>  public void create() throws IOException {
>>>>
>>>>
>>>
>>> Where does this method get called?  I don't see it.
>>>
>>>
>>>     System.out.println("Grades Table populated");
>>>>
>>>>
>>>
>>> Do you want to set up a logger to do the outputting instead?  See the
>>> head
>>> of (most) hbase classes for example.  Look for 'LOG'.
>>>
>>>
>>>   }
>>>> }
>>>>
>>>>
>>>> ====================================================
>>>>
>>>> /**
>>>>  * A map reduce job over {@link GradesTable}
>>>>  * The job produces for each course the average grade in that course.
>>>>  * It puts the average in a separate table which holds course
>>>> statistics.
>>>>  *
>>>>  */
>>>> public class GradesTableMapReduce  extends Configured implements Tool {
>>>>
>>>>  /**
>>>>   * Map a row to {key, value} pairs.
>>>>   * Emit a {course, grade} pair for each course grade appearing in the
>>>> student row.
>>>>   * E.g. Sara {Math:62, Art:45, Sports:87} -> {Math, 62}, {Art, 45},
>>>> {Sports, 87}
>>>>   *
>>>>   */
>>>>  public static class GradesTableMap extends TableMap<Text, IntWritable>
>>>> {
>>>>
>>>>    @Override
>>>>    public void map(HStoreKey key, MapWritable value,
>>>>        OutputCollector<Text, IntWritable> output, Reporter reporter)
>>>> throws
>>>> IOException {
>>>>
>>>>      // Walk through the columns
>>>>      for (Map.Entry<Writable, Writable> e: value.entrySet()) {
>>>>        // Column name is course name
>>>>        Text course = (Text) e.getKey();
>>>>        // Remove the family prefix
>>>>        String courseStr = course.toString();
>>>>        courseStr =
>>>>          courseStr.substring(courseStr.indexOf(':') + 1);
>>>>
>>>>
>>>
>>> There may be utility in HStoreKey to do the above stripping of the
>>> column
>>> family (getQualifier?).
>>>
>>>         course = new Text(courseStr);
>>>>        byte [] gradeInBytes = ((ImmutableBytesWritable)
>>>> e.getValue()).get();
>>>>        DataInputStream in = new DataInputStream(new
>>>> ByteArrayInputStream(gradeInBytes));
>>>>        IntWritable grade = new IntWritable();
>>>>        grade.readFields(in);
>>>>
>>>>
>>>
>>> You could have used Writables.getWritable above and saved yourself a few
>>> lines (Not important).
>>>
>>> Otherwise, this class is an excellent example of using MR + HBase.  I've
>>> add a pointer to it up on the wiki under the MR+HBase page (update the
>>> link
>>> if you update your code).
>>>
>>> Thanks,
>>> St.Ack
>>>
>>
>>
>>
>> --
>> oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00
>> oo
>> 00 oo 00 oo
>> "If you want your children to be intelligent, read them fairy tales. If
>> you
>> want them to be more intelligent, read them more fairy tales." (Albert
>> Einstein)
>>
> 
> 
> 
> -- 
> oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo
> 00 oo 00 oo
> "If you want your children to be intelligent, read them fairy tales. If
> you
> want them to be more intelligent, read them more fairy tales." (Albert
> Einstein)
> 
> 

-- 
View this message in context: http://www.nabble.com/Map-Reduce-over-HBase---sample-code-tp18054248p25107305.html
Sent from the HBase User mailing list archive at Nabble.com.


Re: Map Reduce over HBase - sample code

Posted by Naama Kraus <na...@gmail.com>.
Hi,

Here is a version following the last comments.

Naama

/**
 * <pre>
 * 'Toy tables' for experimenting with MapReduce over HBase
 *
 * grades table - a HBase table of the form -
 * raw id is a student name
 * column name is Course:course_name
 * cell value is the student's grade in the course 'course_name'
 *
 * Exmaple:
 *
 *         Course:Math  |  Course:Art  |  Course:Sports
 *         ----------------------------------------------
 * Dan        87                97              99
 * Dana       100               100             80
 *
 * =======================================
 *
 * courses table - a HBase table of the form -
 * raw id is a course name
 * column name is Stats:Average
 * cell value is the average grade in that course, computed by a map reduce
job
 *
 * Exmaple:
 *
 *            Stats:Average
 *            --------------
 *  Art          86
 *  Match        77
 * </pre>
 * @see GradesTableMapReduce
 *
 *
 */
public class GradesTable {

  public static final String GRADES_TABLE_NAME = "grades";
  public static final String COURSE_TABLE_NAME = "courses";
  public static final String COURSE_FAMILY = "Course:";
  // A column family holding grades statistics
  public static final String STATS_FAMILY = "Stats:";
  // A column member holding average grade in course
  public static final String AVG = "Average";

  private static final String [] STUDENT_NAMES = {
    "Dan", "Dana", "Sara", "David"
  };

  private static final String [] COURSE_NAMES = {
    "Math", "Art", "Sports"
  };

  private HBaseConfiguration conf;
  private HBaseAdmin admin;
  private HTableDescriptor grades_desc;
  private HTableDescriptor courses_desc;
  // Randomly generate a grade
  private Random rand;

  private static final Log LOG =
LogFactory.getLog(GradesTable.class.getName());

  public GradesTable() throws IOException {
    conf = new HBaseConfiguration();
    admin = new HBaseAdmin(conf);
    grades_desc = new HTableDescriptor(GRADES_TABLE_NAME);
    courses_desc = new HTableDescriptor(COURSE_TABLE_NAME);
    rand = new Random();
  }

  /**
   * Create tables and populate with content
   */
  public void create() throws IOException {
    grades_desc.addFamily(new HColumnDescriptor(COURSE_FAMILY));
    courses_desc.addFamily(new HColumnDescriptor(STATS_FAMILY));
    admin.createTable(grades_desc);
    admin.createTable(courses_desc);
    LOG.info("Tables created");

    // Populate grades table with students and their grades in courses
    HTable table = new HTable(conf, new Text(GRADES_TABLE_NAME));

    // Start an update transaction, student name is row id
    for (int i = 0; i < STUDENT_NAMES.length; i++) {
      LOG.info("<<< Row " + i + ", student: " + STUDENT_NAMES[i] + " >>>");
      Text stuName = new Text(STUDENT_NAMES[i]);
      long writeid = table.startUpdate(stuName);
      for (int j = 0; j < COURSE_NAMES.length; j++) {
        Text courseColumn = new Text(COURSE_FAMILY + COURSE_NAMES[j]);
        // Put a cell with a student's grade in this course
        int grade = Math.abs(rand.nextInt()) % 101;
        table.put(writeid, courseColumn, new IntWritable(grade));
        LOG.info("Course: " + COURSE_NAMES[j] + ", grade: " + grade);
      }
      table.commit(writeid);
    }
    LOG.info("Grades Table populated");
  }

  public static void main(String [] args) {
    try {
      GradesTable gradesTable = new GradesTable();
      gradesTable.create();
    } catch (IOException e) {
      LOG.fatal("An exception occured", e);
    }
}

=========================================================

/**
 * A map reduce job over {@link GradesTable}
 * The job produces for each course the average grade in that course.
 * It puts the average in a separate table which holds course statistics.
 *
 */
public class GradesTableMapReduce  extends Configured implements Tool {

  /**
   * Map a row to {key, value} pairs.
   * Emit a {course, grade} pair for each course grade appearing in the
student row.
   * E.g. Sara {Math:62, Art:45, Sports:87} -> {Math, 62}, {Art, 45},
{Sports, 87}
   *
   */
  public static class GradesTableMap extends TableMap<Text, IntWritable> {

    @Override
    public void map(HStoreKey key, MapWritable value,
        OutputCollector<Text, IntWritable> output, Reporter reporter) throws
IOException {

      // Walk through the columns
      for (Map.Entry<Writable, Writable> e: value.entrySet()) {
        // Column name is course name
        Text course = (Text) e.getKey();
        // Remove the family prefix
        String courseStr = HStoreKey.extractQualifier(course).toString();
        course = new Text(courseStr);
        byte [] gradeInBytes = ((ImmutableBytesWritable)
e.getValue()).get();
        IntWritable grade = new IntWritable();
        Writables.getWritable(gradeInBytes, grade);

        // Emit course name and a grade
        output.collect(course, grade);
      }
    }
  }

  /**
   * Reduce - compute an average of key's values which is actually the
average grade in each course.
   * E.g. {Math, {62, 45, 87}} -> {Math, 65.6}
   *
   */
  public static class GradesTableReduce extends TableReduce<Text,
IntWritable> {

    @Override
    // key is course name, values are grades in the course
    public void reduce(Text key, Iterator<IntWritable> values,
        OutputCollector<Text, MapWritable> output, Reporter reporter)
    throws IOException {
      // Compute grades average
      int total = 0;
      int sum = 0;
      while (values.hasNext()) {
        total++;
        sum += values.next().get();
      }
      float average = sum / total;

      // We put the average as a separate column in the courses table
      ByteArrayOutputStream baos = new ByteArrayOutputStream();
      DataOutputStream out = new DataOutputStream(baos);
      FloatWritable avgWritable = new FloatWritable(average);
      avgWritable.write(out);
      MapWritable map = new MapWritable();
      map.put(new Text(GradesTable.STATS_FAMILY + GradesTable.AVG),
              new ImmutableBytesWritable(baos.toByteArray()));
      output.collect(key, map);
    }
  }

  /**
   * Run
   */
  public int run(String[] args) throws Exception {
    JobConf jobConf = new JobConf();
    jobConf.setJobName("compute average grades");
    jobConf.setNumReduceTasks(1);

    // All columns in the course family (i.e. all grades) get into the map
    TableMap.initJob(GradesTable.GRADES_TABLE_NAME,
GradesTable.COURSE_FAMILY,
        GradesTableMap.class, jobConf);

    // Reduce output (course average grade) is put in the courses table
    TableReduce.initJob(GradesTable.COURSE_TABLE_NAME,
        GradesTableReduce.class, jobConf);

    // Map produces a value which is an IntWritable
    jobConf.setMapOutputValueClass(IntWritable.class);

    JobClient.runJob(jobConf);
    return 0;
  }

  public static void main(String [] args) throws Exception {
    ToolRunner.run(new Configuration(), new GradesTableMapReduce(), args);
  }
}

On Thu, Jul 3, 2008 at 9:44 AM, Naama Kraus <na...@gmail.com> wrote:

> Thanks St.Ack for the further comments and for putting a link in the Wiki.
> Naama
>
>
> On Tue, Jul 1, 2008 at 8:38 PM, stack <st...@duboce.net> wrote:
>
>> Comments in-line below:
>>
>> Naama Kraus wrote:
>>
>>> Here is an updated code
>>>
>>> Naama
>>>
>>> /**
>>>  * <pre>
>>>  * 'Toy tables' for experiencing with MapReduce over HBase
>>>
>>>
>> Do you mean 'experimenting' in the above?
>>
>> ....
>>
>>>  public void create() throws IOException {
>>>
>>>
>>
>> Where does this method get called?  I don't see it.
>>
>>
>>     System.out.println("Grades Table populated");
>>>
>>>
>>
>> Do you want to set up a logger to do the outputting instead?  See the head
>> of (most) hbase classes for example.  Look for 'LOG'.
>>
>>
>>   }
>>> }
>>>
>>>
>>> ====================================================
>>>
>>> /**
>>>  * A map reduce job over {@link GradesTable}
>>>  * The job produces for each course the average grade in that course.
>>>  * It puts the average in a separate table which holds course statistics.
>>>  *
>>>  */
>>> public class GradesTableMapReduce  extends Configured implements Tool {
>>>
>>>  /**
>>>   * Map a row to {key, value} pairs.
>>>   * Emit a {course, grade} pair for each course grade appearing in the
>>> student row.
>>>   * E.g. Sara {Math:62, Art:45, Sports:87} -> {Math, 62}, {Art, 45},
>>> {Sports, 87}
>>>   *
>>>   */
>>>  public static class GradesTableMap extends TableMap<Text, IntWritable> {
>>>
>>>    @Override
>>>    public void map(HStoreKey key, MapWritable value,
>>>        OutputCollector<Text, IntWritable> output, Reporter reporter)
>>> throws
>>> IOException {
>>>
>>>      // Walk through the columns
>>>      for (Map.Entry<Writable, Writable> e: value.entrySet()) {
>>>        // Column name is course name
>>>        Text course = (Text) e.getKey();
>>>        // Remove the family prefix
>>>        String courseStr = course.toString();
>>>        courseStr =
>>>          courseStr.substring(courseStr.indexOf(':') + 1);
>>>
>>>
>>
>> There may be utility in HStoreKey to do the above stripping of the column
>> family (getQualifier?).
>>
>>         course = new Text(courseStr);
>>>        byte [] gradeInBytes = ((ImmutableBytesWritable)
>>> e.getValue()).get();
>>>        DataInputStream in = new DataInputStream(new
>>> ByteArrayInputStream(gradeInBytes));
>>>        IntWritable grade = new IntWritable();
>>>        grade.readFields(in);
>>>
>>>
>>
>> You could have used Writables.getWritable above and saved yourself a few
>> lines (Not important).
>>
>> Otherwise, this class is an excellent example of using MR + HBase.  I've
>> add a pointer to it up on the wiki under the MR+HBase page (update the link
>> if you update your code).
>>
>> Thanks,
>> St.Ack
>>
>
>
>
> --
> oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo
> 00 oo 00 oo
> "If you want your children to be intelligent, read them fairy tales. If you
> want them to be more intelligent, read them more fairy tales." (Albert
> Einstein)
>



-- 
oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo
00 oo 00 oo
"If you want your children to be intelligent, read them fairy tales. If you
want them to be more intelligent, read them more fairy tales." (Albert
Einstein)

Re: Map Reduce over HBase - sample code

Posted by Naama Kraus <na...@gmail.com>.
Thanks St.Ack for the further comments and for putting a link in the Wiki.
Naama

On Tue, Jul 1, 2008 at 8:38 PM, stack <st...@duboce.net> wrote:

> Comments in-line below:
>
> Naama Kraus wrote:
>
>> Here is an updated code
>>
>> Naama
>>
>> /**
>>  * <pre>
>>  * 'Toy tables' for experiencing with MapReduce over HBase
>>
>>
> Do you mean 'experimenting' in the above?
>
> ....
>
>>  public void create() throws IOException {
>>
>>
>
> Where does this method get called?  I don't see it.
>
>
>     System.out.println("Grades Table populated");
>>
>>
>
> Do you want to set up a logger to do the outputting instead?  See the head
> of (most) hbase classes for example.  Look for 'LOG'.
>
>
>   }
>> }
>>
>>
>> ====================================================
>>
>> /**
>>  * A map reduce job over {@link GradesTable}
>>  * The job produces for each course the average grade in that course.
>>  * It puts the average in a separate table which holds course statistics.
>>  *
>>  */
>> public class GradesTableMapReduce  extends Configured implements Tool {
>>
>>  /**
>>   * Map a row to {key, value} pairs.
>>   * Emit a {course, grade} pair for each course grade appearing in the
>> student row.
>>   * E.g. Sara {Math:62, Art:45, Sports:87} -> {Math, 62}, {Art, 45},
>> {Sports, 87}
>>   *
>>   */
>>  public static class GradesTableMap extends TableMap<Text, IntWritable> {
>>
>>    @Override
>>    public void map(HStoreKey key, MapWritable value,
>>        OutputCollector<Text, IntWritable> output, Reporter reporter)
>> throws
>> IOException {
>>
>>      // Walk through the columns
>>      for (Map.Entry<Writable, Writable> e: value.entrySet()) {
>>        // Column name is course name
>>        Text course = (Text) e.getKey();
>>        // Remove the family prefix
>>        String courseStr = course.toString();
>>        courseStr =
>>          courseStr.substring(courseStr.indexOf(':') + 1);
>>
>>
>
> There may be utility in HStoreKey to do the above stripping of the column
> family (getQualifier?).
>
>         course = new Text(courseStr);
>>        byte [] gradeInBytes = ((ImmutableBytesWritable)
>> e.getValue()).get();
>>        DataInputStream in = new DataInputStream(new
>> ByteArrayInputStream(gradeInBytes));
>>        IntWritable grade = new IntWritable();
>>        grade.readFields(in);
>>
>>
>
> You could have used Writables.getWritable above and saved yourself a few
> lines (Not important).
>
> Otherwise, this class is an excellent example of using MR + HBase.  I've
> add a pointer to it up on the wiki under the MR+HBase page (update the link
> if you update your code).
>
> Thanks,
> St.Ack
>



-- 
oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo
00 oo 00 oo
"If you want your children to be intelligent, read them fairy tales. If you
want them to be more intelligent, read them more fairy tales." (Albert
Einstein)