You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jg...@apache.org on 2009/09/17 20:37:24 UTC

svn commit: r816323 - in /hadoop/hbase/trunk: CHANGES.txt src/examples/mapred/org/apache/hadoop/hbase/mapred/SampleUploader.java

Author: jgray
Date: Thu Sep 17 18:37:23 2009
New Revision: 816323

URL: http://svn.apache.org/viewvc?rev=816323&view=rev
Log:
HBASE-1850 src/examples/mapred do not compile after HBASE-1822

Modified:
    hadoop/hbase/trunk/CHANGES.txt
    hadoop/hbase/trunk/src/examples/mapred/org/apache/hadoop/hbase/mapred/SampleUploader.java

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=816323&r1=816322&r2=816323&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Thu Sep 17 18:37:23 2009
@@ -33,6 +33,7 @@
                for when Writable is not Configurable (Stack via jgray)
    HBASE-1847  Delete latest of a null qualifier when non-null qualifiers
                exist throws a RuntimeException 
+   HBASE-1850  src/examples/mapred do not compile after HBASE-1822
 
   IMPROVEMENTS
    HBASE-1760  Cleanup TODOs in HTable

Modified: hadoop/hbase/trunk/src/examples/mapred/org/apache/hadoop/hbase/mapred/SampleUploader.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/examples/mapred/org/apache/hadoop/hbase/mapred/SampleUploader.java?rev=816323&r1=816322&r2=816323&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/examples/mapred/org/apache/hadoop/hbase/mapred/SampleUploader.java (original)
+++ hadoop/hbase/trunk/src/examples/mapred/org/apache/hadoop/hbase/mapred/SampleUploader.java Thu Sep 17 18:37:23 2009
@@ -1,139 +1,148 @@
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.hbase.mapred;
 
 import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map.Entry;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.io.BatchUpdate;
-import org.apache.hadoop.hbase.io.HbaseMapWritable;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapred.TableMapReduceUtil;
-import org.apache.hadoop.hbase.mapred.TableReduce;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-/*
- * Sample uploader.
- * 
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
+
+/**
+ * Sample Uploader MapReduce
+ * <p>
  * This is EXAMPLE code.  You will need to change it to work for your context.
- * 
- * Uses TableReduce to put the data into hbase. Change the InputFormat to suit
- * your data. Use the map to massage the input so it fits hbase.  Currently its
- * just a pass-through map.  In the reduce, you need to output a row and a
- * map of columns to cells.  Change map and reduce to suit your input.
- * 
- * <p>The below is wired up to handle an input whose format is a text file
- * which has a line format as follow:
- * <pre>
- * row columnname columndata
- * </pre>
- * 
- * <p>The table and columnfamily we're to insert into must preexist.
- * 
+ * <p>
+ * Uses {@link TableReducer} to put the data into HBase. Change the InputFormat 
+ * to suit your data.  In this example, we are importing a CSV file.
+ * <p>
+ * <pre>row,family,qualifier,value</pre>
+ * <p>
+ * The table and columnfamily we're to insert into must preexist.
+ * <p>
+ * There is no reducer in this example as it is not necessary and adds 
+ * significant overhead.  If you need to do any massaging of data before
+ * inserting into HBase, you can do this in the map as well.
  * <p>Do the following to start the MR job:
  * <pre>
- * ./bin/hadoop org.apache.hadoop.hbase.mapred.SampleUploader /tmp/input.txt TABLE_NAME
+ * ./bin/hadoop org.apache.hadoop.hbase.mapreduce.SampleUploader /tmp/input.csv TABLE_NAME
  * </pre>
- * 
- * <p>This code was written against hbase 0.1 branch.
+ * <p>
+ * This code was written against HBase 0.21 trunk.
  */
-public class SampleUploader extends MapReduceBase
-implements Mapper<LongWritable, Text, ImmutableBytesWritable, HbaseMapWritable<byte [], byte []>>,
-    Tool {
-  private static final String NAME = "SampleUploader";
-  private Configuration conf;
-
-  public JobConf createSubmittableJob(String[] args)
-  throws IOException {
-    JobConf c = new JobConf(getConf(), SampleUploader.class);
-    c.setJobName(NAME);
-    FileInputFormat.setInputPaths(c, new Path(args[0]));
-    c.setMapperClass(this.getClass());
-    c.setMapOutputKeyClass(ImmutableBytesWritable.class);
-    c.setMapOutputValueClass(HbaseMapWritable.class);
-    c.setReducerClass(TableUploader.class);
-    TableMapReduceUtil.initTableReduceJob(args[1], TableUploader.class, c);
-    return c;
-  } 
-                                                                                                                                                                                                                                                                       
-  public void map(LongWritable k, Text v,
-      OutputCollector<ImmutableBytesWritable, HbaseMapWritable<byte [], byte []>> output,
-      Reporter r)
-  throws IOException {
-     // Lines are space-delimited; first item is row, next the columnname and
-     // then the third the cell value.
-    String tmp = v.toString();
-    if (tmp.length() == 0) {
-      return;
-    }
-    String [] splits = v.toString().split(" ");
-    HbaseMapWritable<byte [], byte []> mw =
-      new HbaseMapWritable<byte [], byte []>();
-    mw.put(Bytes.toBytes(splits[1]), Bytes.toBytes(splits[2]));
-    byte [] row = Bytes.toBytes(splits[0]);
-    r.setStatus("Map emitting " + splits[0] + " for record " + k.toString());
-    output.collect(new ImmutableBytesWritable(row), mw);
-  }
+public class SampleUploader {
 
-  public static class TableUploader extends MapReduceBase
-  implements TableReduce<ImmutableBytesWritable, HbaseMapWritable<byte [], byte []>> {
-    public void reduce(ImmutableBytesWritable k, Iterator<HbaseMapWritable<byte [], byte []>> v,
-        OutputCollector<ImmutableBytesWritable, BatchUpdate> output,
-        Reporter r)
+  private static final String NAME = "SampleUploader";
+  
+  static class Uploader 
+  extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
+
+    private long checkpoint = 100;
+    private long count = 0;
+    
+    @Override
+    public void map(LongWritable key, Text line, Context context)
     throws IOException {
-      while (v.hasNext()) {
-        r.setStatus("Reducer committing " + k);
-        BatchUpdate bu = new BatchUpdate(k.get());
-        while (v.hasNext()) {
-          HbaseMapWritable<byte [], byte []> hmw = v.next();
-          for (Entry<byte [], byte []> e: hmw.entrySet()) {
-            bu.put(e.getKey(), e.getValue());
-          }
-        }
-        output.collect(k, bu);
+      
+      // Input is a CSV file
+      // Each map() is a single line, where the key is the line number
+      // Each line is comma-delimited; row,family,qualifier,value
+            
+      // Split CSV line
+      String [] values = line.toString().split(",");
+      if(values.length != 4) {
+        return;
+      }
+      
+      // Extract each value
+      byte [] row = Bytes.toBytes(values[0]);
+      byte [] family = Bytes.toBytes(values[1]);
+      byte [] qualifier = Bytes.toBytes(values[2]);
+      byte [] value = Bytes.toBytes(values[3]);
+      
+      // Create Put
+      Put put = new Put(row);
+      put.add(family, qualifier, value);
+      
+      // Uncomment below to disable WAL. This will improve performance but means
+      // you will experience data loss in the case of a RegionServer crash.
+      // put.setWriteToWAL(false);
+      
+      try {
+        context.write(new ImmutableBytesWritable(row), put);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+      
+      // Set status every checkpoint lines
+      if(++count % checkpoint == 0) {
+        context.setStatus("Emitting Put " + count);
       }
     }
   }
-
-  static int printUsage() {
-    System.out.println(NAME + " <input> <table_name>");
-    return -1;
-  } 
-
-  public int run(@SuppressWarnings("unused") String[] args) throws Exception {
-    // Make sure there are exactly 2 parameters left.
-    if (args.length != 2) {
-      System.out.println("ERROR: Wrong number of parameters: " +
-        args.length + " instead of 2.");
-      return printUsage();
-    }
-    JobClient.runJob(createSubmittableJob(args));
-    return 0;
-  }
-
-  public Configuration getConf() {
-    return this.conf;
-  } 
-
-  public void setConf(final Configuration c) {
-    this.conf = c;
+  
+  /**
+   * Job configuration.
+   */
+  public static Job configureJob(Configuration conf, String [] args)
+  throws IOException {
+    Path inputPath = new Path(args[0]);
+    String tableName = args[1];
+    Job job = new Job(conf, NAME + "_" + tableName);
+    job.setJarByClass(Uploader.class);
+    FileInputFormat.setInputPaths(job, inputPath);
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setMapperClass(Uploader.class);
+    // No reducers.  Just write straight to table.  Call initTableReducerJob
+    // because it sets up the TableOutputFormat.
+    TableMapReduceUtil.initTableReducerJob(tableName, null, job);
+    job.setNumReduceTasks(0);
+    return job;
   }
 
+  /**
+   * Main entry point.
+   * 
+   * @param args  The command line parameters.
+   * @throws Exception When running the job fails.
+   */
   public static void main(String[] args) throws Exception {
-    int errCode = ToolRunner.run(new Configuration(), new SampleUploader(),
-      args);
-    System.exit(errCode);
+    HBaseConfiguration conf = new HBaseConfiguration();
+    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+    if(otherArgs.length != 2) {
+      System.err.println("Wrong number of arguments: " + otherArgs.length);
+      System.err.println("Usage: " + NAME + " <input> <tablename>");
+      System.exit(-1);
+    }
+    Job job = configureJob(conf, otherArgs);
+    System.exit(job.waitForCompletion(true) ? 0 : 1);
   }
 }
\ No newline at end of file