You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jg...@apache.org on 2009/09/17 20:37:24 UTC
svn commit: r816323 - in /hadoop/hbase/trunk: CHANGES.txt
src/examples/mapred/org/apache/hadoop/hbase/mapred/SampleUploader.java
Author: jgray
Date: Thu Sep 17 18:37:23 2009
New Revision: 816323
URL: http://svn.apache.org/viewvc?rev=816323&view=rev
Log:
HBASE-1850 src/examples/mapred do not compile after HBASE-1822
Modified:
hadoop/hbase/trunk/CHANGES.txt
hadoop/hbase/trunk/src/examples/mapred/org/apache/hadoop/hbase/mapred/SampleUploader.java
Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=816323&r1=816322&r2=816323&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Thu Sep 17 18:37:23 2009
@@ -33,6 +33,7 @@
for when Writable is not Configurable (Stack via jgray)
HBASE-1847 Delete latest of a null qualifier when non-null qualifiers
exist throws a RuntimeException
+ HBASE-1850 src/examples/mapred do not compile after HBASE-1822
IMPROVEMENTS
HBASE-1760 Cleanup TODOs in HTable
Modified: hadoop/hbase/trunk/src/examples/mapred/org/apache/hadoop/hbase/mapred/SampleUploader.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/examples/mapred/org/apache/hadoop/hbase/mapred/SampleUploader.java?rev=816323&r1=816322&r2=816323&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/examples/mapred/org/apache/hadoop/hbase/mapred/SampleUploader.java (original)
+++ hadoop/hbase/trunk/src/examples/mapred/org/apache/hadoop/hbase/mapred/SampleUploader.java Thu Sep 17 18:37:23 2009
@@ -1,139 +1,148 @@
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.hadoop.hbase.mapred;
import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.io.BatchUpdate;
-import org.apache.hadoop.hbase.io.HbaseMapWritable;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapred.TableMapReduceUtil;
-import org.apache.hadoop.hbase.mapred.TableReduce;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-/*
- * Sample uploader.
- *
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
+
+/**
+ * Sample Uploader MapReduce
+ * <p>
* This is EXAMPLE code. You will need to change it to work for your context.
- *
- * Uses TableReduce to put the data into hbase. Change the InputFormat to suit
- * your data. Use the map to massage the input so it fits hbase. Currently its
- * just a pass-through map. In the reduce, you need to output a row and a
- * map of columns to cells. Change map and reduce to suit your input.
- *
- * <p>The below is wired up to handle an input whose format is a text file
- * which has a line format as follow:
- * <pre>
- * row columnname columndata
- * </pre>
- *
- * <p>The table and columnfamily we're to insert into must preexist.
- *
+ * <p>
+ * Uses {@link TableReducer} to put the data into HBase. Change the InputFormat
+ * to suit your data. In this example, we are importing a CSV file.
+ * <p>
+ * <pre>row,family,qualifier,value</pre>
+ * <p>
+ * The table and columnfamily we're to insert into must preexist.
+ * <p>
+ * There is no reducer in this example as it is not necessary and adds
+ * significant overhead. If you need to do any massaging of data before
+ * inserting into HBase, you can do this in the map as well.
* <p>Do the following to start the MR job:
* <pre>
- * ./bin/hadoop org.apache.hadoop.hbase.mapred.SampleUploader /tmp/input.txt TABLE_NAME
+ * ./bin/hadoop org.apache.hadoop.hbase.mapreduce.SampleUploader /tmp/input.csv TABLE_NAME
* </pre>
- *
- * <p>This code was written against hbase 0.1 branch.
+ * <p>
+ * This code was written against HBase 0.21 trunk.
*/
-public class SampleUploader extends MapReduceBase
-implements Mapper<LongWritable, Text, ImmutableBytesWritable, HbaseMapWritable<byte [], byte []>>,
- Tool {
- private static final String NAME = "SampleUploader";
- private Configuration conf;
-
- public JobConf createSubmittableJob(String[] args)
- throws IOException {
- JobConf c = new JobConf(getConf(), SampleUploader.class);
- c.setJobName(NAME);
- FileInputFormat.setInputPaths(c, new Path(args[0]));
- c.setMapperClass(this.getClass());
- c.setMapOutputKeyClass(ImmutableBytesWritable.class);
- c.setMapOutputValueClass(HbaseMapWritable.class);
- c.setReducerClass(TableUploader.class);
- TableMapReduceUtil.initTableReduceJob(args[1], TableUploader.class, c);
- return c;
- }
-
- public void map(LongWritable k, Text v,
- OutputCollector<ImmutableBytesWritable, HbaseMapWritable<byte [], byte []>> output,
- Reporter r)
- throws IOException {
- // Lines are space-delimited; first item is row, next the columnname and
- // then the third the cell value.
- String tmp = v.toString();
- if (tmp.length() == 0) {
- return;
- }
- String [] splits = v.toString().split(" ");
- HbaseMapWritable<byte [], byte []> mw =
- new HbaseMapWritable<byte [], byte []>();
- mw.put(Bytes.toBytes(splits[1]), Bytes.toBytes(splits[2]));
- byte [] row = Bytes.toBytes(splits[0]);
- r.setStatus("Map emitting " + splits[0] + " for record " + k.toString());
- output.collect(new ImmutableBytesWritable(row), mw);
- }
+public class SampleUploader {
- public static class TableUploader extends MapReduceBase
- implements TableReduce<ImmutableBytesWritable, HbaseMapWritable<byte [], byte []>> {
- public void reduce(ImmutableBytesWritable k, Iterator<HbaseMapWritable<byte [], byte []>> v,
- OutputCollector<ImmutableBytesWritable, BatchUpdate> output,
- Reporter r)
+ private static final String NAME = "SampleUploader";
+
+ static class Uploader
+ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
+
+ private long checkpoint = 100;
+ private long count = 0;
+
+ @Override
+ public void map(LongWritable key, Text line, Context context)
throws IOException {
- while (v.hasNext()) {
- r.setStatus("Reducer committing " + k);
- BatchUpdate bu = new BatchUpdate(k.get());
- while (v.hasNext()) {
- HbaseMapWritable<byte [], byte []> hmw = v.next();
- for (Entry<byte [], byte []> e: hmw.entrySet()) {
- bu.put(e.getKey(), e.getValue());
- }
- }
- output.collect(k, bu);
+
+ // Input is a CSV file
+ // Each map() is a single line, where the key is the line number
+ // Each line is comma-delimited; row,family,qualifier,value
+
+ // Split CSV line
+ String [] values = line.toString().split(",");
+ if(values.length != 4) {
+ return;
+ }
+
+ // Extract each value
+ byte [] row = Bytes.toBytes(values[0]);
+ byte [] family = Bytes.toBytes(values[1]);
+ byte [] qualifier = Bytes.toBytes(values[2]);
+ byte [] value = Bytes.toBytes(values[3]);
+
+ // Create Put
+ Put put = new Put(row);
+ put.add(family, qualifier, value);
+
+ // Uncomment below to disable WAL. This will improve performance but means
+ // you will experience data loss in the case of a RegionServer crash.
+ // put.setWriteToWAL(false);
+
+ try {
+ context.write(new ImmutableBytesWritable(row), put);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ // Set status every checkpoint lines
+ if(++count % checkpoint == 0) {
+ context.setStatus("Emitting Put " + count);
}
}
}
-
- static int printUsage() {
- System.out.println(NAME + " <input> <table_name>");
- return -1;
- }
-
- public int run(@SuppressWarnings("unused") String[] args) throws Exception {
- // Make sure there are exactly 2 parameters left.
- if (args.length != 2) {
- System.out.println("ERROR: Wrong number of parameters: " +
- args.length + " instead of 2.");
- return printUsage();
- }
- JobClient.runJob(createSubmittableJob(args));
- return 0;
- }
-
- public Configuration getConf() {
- return this.conf;
- }
-
- public void setConf(final Configuration c) {
- this.conf = c;
+
+ /**
+ * Job configuration.
+ */
+ public static Job configureJob(Configuration conf, String [] args)
+ throws IOException {
+ Path inputPath = new Path(args[0]);
+ String tableName = args[1];
+ Job job = new Job(conf, NAME + "_" + tableName);
+ job.setJarByClass(Uploader.class);
+ FileInputFormat.setInputPaths(job, inputPath);
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setMapperClass(Uploader.class);
+ // No reducers. Just write straight to table. Call initTableReducerJob
+ // because it sets up the TableOutputFormat.
+ TableMapReduceUtil.initTableReducerJob(tableName, null, job);
+ job.setNumReduceTasks(0);
+ return job;
}
+ /**
+ * Main entry point.
+ *
+ * @param args The command line parameters.
+ * @throws Exception When running the job fails.
+ */
public static void main(String[] args) throws Exception {
- int errCode = ToolRunner.run(new Configuration(), new SampleUploader(),
- args);
- System.exit(errCode);
+ HBaseConfiguration conf = new HBaseConfiguration();
+ String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+ if(otherArgs.length != 2) {
+ System.err.println("Wrong number of arguments: " + otherArgs.length);
+ System.err.println("Usage: " + NAME + " <input> <tablename>");
+ System.exit(-1);
+ }
+ Job job = configureJob(conf, otherArgs);
+ System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
\ No newline at end of file