You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2010/01/16 01:02:11 UTC

svn commit: r899845 [2/2] - in /hadoop/hbase/trunk: ./ src/java/org/apache/hadoop/hbase/mapred/ src/java/package-info/ src/test/org/apache/hadoop/hbase/mapred/

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java?rev=899845&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java Sat Jan 16 00:02:10 2010
@@ -0,0 +1,106 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.hadoop.mapred.InvalidJobConfException;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Convert Map/Reduce output and write it to an HBase table
+ */
+@Deprecated
+public class TableOutputFormat extends
+FileOutputFormat<ImmutableBytesWritable, Put> {
+
+  /** JobConf parameter that specifies the output table */
+  public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
+  private final Log LOG = LogFactory.getLog(TableOutputFormat.class);
+
+  /**
+   * Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable)
+   * and write to an HBase table
+   */
+  protected static class TableRecordWriter
+    implements RecordWriter<ImmutableBytesWritable, Put> {
+    private HTable m_table;
+
+    /**
+     * Instantiate a TableRecordWriter with the HBase HClient for writing.
+     *
+     * @param table
+     */
+    public TableRecordWriter(HTable table) {
+      m_table = table;
+    }
+
+    public void close(Reporter reporter)
+      throws IOException {
+      m_table.flushCommits();
+    }
+
+    public void write(ImmutableBytesWritable key,
+        Put value) throws IOException {
+      m_table.put(new Put(value));
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public RecordWriter getRecordWriter(FileSystem ignored,
+      JobConf job, String name, Progressable progress) throws IOException {
+
+    // expecting exactly one path
+
+    String tableName = job.get(OUTPUT_TABLE);
+    HTable table = null;
+    try {
+      table = new HTable(new HBaseConfiguration(job), tableName);
+    } catch(IOException e) {
+      LOG.error(e);
+      throw e;
+    }
+    table.setAutoFlush(false);
+    return new TableRecordWriter(table);
+  }
+
+  @Override
+  public void checkOutputSpecs(FileSystem ignored, JobConf job)
+  throws FileAlreadyExistsException, InvalidJobConfException, IOException {
+
+    String tableName = job.get(OUTPUT_TABLE);
+    if(tableName == null) {
+      throw new IOException("Must specify table name");
+    }
+  }
+}
\ No newline at end of file

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java?rev=899845&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java Sat Jan 16 00:02:10 2010
@@ -0,0 +1,39 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.Reducer;
+
+/**
+ * Write a table, sorting by the input key
+ *
+ * @param <K> key class
+ * @param <V> value class
+ */
+@Deprecated
+@SuppressWarnings("unchecked")
+public interface TableReduce<K extends WritableComparable, V extends Writable>
+extends Reducer<K, V, ImmutableBytesWritable, Put> {
+
+}
\ No newline at end of file

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableSplit.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableSplit.java?rev=899845&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableSplit.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableSplit.java Sat Jan 16 00:02:10 2010
@@ -0,0 +1,113 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.InputSplit;
+
+/**
+ * A table split corresponds to a key range [low, high)
+ */
+@Deprecated
+public class TableSplit implements InputSplit, Comparable<TableSplit> {
+  private byte [] m_tableName;
+  private byte [] m_startRow;
+  private byte [] m_endRow;
+  private String m_regionLocation;
+
+  /** default constructor */
+  public TableSplit() {
+    this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY,
+      HConstants.EMPTY_BYTE_ARRAY, "");
+  }
+
+  /**
+   * Constructor
+   * @param tableName
+   * @param startRow
+   * @param endRow
+   * @param location
+   */
+  public TableSplit(byte [] tableName, byte [] startRow, byte [] endRow,
+      final String location) {
+    this.m_tableName = tableName;
+    this.m_startRow = startRow;
+    this.m_endRow = endRow;
+    this.m_regionLocation = location;
+  }
+
+  /** @return table name */
+  public byte [] getTableName() {
+    return this.m_tableName;
+  }
+
+  /** @return starting row key */
+  public byte [] getStartRow() {
+    return this.m_startRow;
+  }
+
+  /** @return end row key */
+  public byte [] getEndRow() {
+    return this.m_endRow;
+  }
+
+  /** @return the region's hostname */
+  public String getRegionLocation() {
+    return this.m_regionLocation;
+  }
+
+  public String[] getLocations() {
+    return new String[] {this.m_regionLocation};
+  }
+
+  public long getLength() {
+    // Not clear how to obtain this... seems to be used only for sorting splits
+    return 0;
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    this.m_tableName = Bytes.readByteArray(in);
+    this.m_startRow = Bytes.readByteArray(in);
+    this.m_endRow = Bytes.readByteArray(in);
+    this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in));
+  }
+
+  public void write(DataOutput out) throws IOException {
+    Bytes.writeByteArray(out, this.m_tableName);
+    Bytes.writeByteArray(out, this.m_startRow);
+    Bytes.writeByteArray(out, this.m_endRow);
+    Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation));
+  }
+
+  @Override
+  public String toString() {
+    return m_regionLocation + ":" +
+      Bytes.toStringBinary(m_startRow) + "," + Bytes.toStringBinary(m_endRow);
+  }
+
+  public int compareTo(TableSplit o) {
+    return Bytes.compareTo(getStartRow(), o.getStartRow());
+  }
+}
\ No newline at end of file

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/package-info.java?rev=899845&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/package-info.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/package-info.java Sat Jan 16 00:02:10 2010
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+Provides HBase <a href="http://wiki.apache.org/hadoop/HadoopMapReduce">MapReduce</a>
+Input/OutputFormats, a table indexing MapReduce job, and utility
+
+<h2>Table of Contents</h2>
+<ul>
+<li><a href="#classpath">HBase, MapReduce and the CLASSPATH</a></li>
+<li><a href="#sink">HBase as MapReduce job data source and sink</a></li>
+<li><a href="#examples">Example Code</a></li>
+</ul>
+
+<h2><a name="classpath">HBase, MapReduce and the CLASSPATH</a></h2>
+
+<p>MapReduce jobs deployed to a MapReduce cluster do not by default have access
+to the HBase configuration under <code>$HBASE_CONF_DIR</code> nor to HBase classes.
+You could add <code>hbase-site.xml</code> to $HADOOP_HOME/conf and add
+<code>hbase-X.X.X.jar</code> to the <code>$HADOOP_HOME/lib</code> and copy these
+changes across your cluster but the cleanest means of adding hbase configuration
+and classes to the cluster <code>CLASSPATH</code> is by uncommenting
+<code>HADOOP_CLASSPATH</code> in <code>$HADOOP_HOME/conf/hadoop-env.sh</code>
+and adding the path to the hbase jar and <code>$HBASE_CONF_DIR</code> directory.
+Then copy the amended configuration around the cluster.
+You'll probably need to restart the MapReduce cluster if you want it to notice
+the new configuration.
+</p>
+
+<p>For example, here is how you would amend <code>hadoop-env.sh</code> adding the
+built hbase jar, hbase conf, and the <code>PerformanceEvaluation</code> class from
+the built hbase test jar to the hadoop <code>CLASSPATH<code>:
+
+<blockquote><pre># Extra Java CLASSPATH elements. Optional.
+# export HADOOP_CLASSPATH=
+export HADOOP_CLASSPATH=$HBASE_HOME/build/test:$HBASE_HOME/build/hbase-X.X.X.jar:$HBASE_HOME/build/hbase-X.X.X-test.jar:$HBASE_HOME/conf</pre></blockquote>
+
+<p>Expand <code>$HBASE_HOME</code> in the above appropriately to suit your
+local environment.</p>
+
+<p>After copying the above change around your cluster, this is how you would run
+the PerformanceEvaluation MR job to put up 4 clients (Presumes a ready mapreduce
+cluster):
+
+<blockquote><pre>$HADOOP_HOME/bin/hadoop org.apache.hadoop.hbase.PerformanceEvaluation sequentialWrite 4</pre></blockquote>
+
+The PerformanceEvaluation class wil be found on the CLASSPATH because you
+added <code>$HBASE_HOME/build/test</code> to HADOOP_CLASSPATH
+</p>
+
+<p>Another possibility, if for example you do not have access to hadoop-env.sh or
+are unable to restart the hadoop cluster, is bundling the hbase jar into a mapreduce
+job jar adding it and its dependencies under the job jar <code>lib/</code>
+directory and the hbase conf into a job jar <code>conf/</code> directory.
+</a>
+
+<h2><a name="sink">HBase as MapReduce job data source and sink</a></h2>
+
+<p>HBase can be used as a data source, {@link org.apache.hadoop.hbase.mapred.TableInputFormat TableInputFormat},
+and data sink, {@link org.apache.hadoop.hbase.mapred.TableOutputFormat TableOutputFormat}, for MapReduce jobs.
+Writing MapReduce jobs that read or write HBase, you'll probably want to subclass
+{@link org.apache.hadoop.hbase.mapred.TableMap TableMap} and/or
+{@link org.apache.hadoop.hbase.mapred.TableReduce TableReduce}.  See the do-nothing
+pass-through classes {@link org.apache.hadoop.hbase.mapred.IdentityTableMap IdentityTableMap} and
+{@link org.apache.hadoop.hbase.mapred.IdentityTableReduce IdentityTableReduce} for basic usage.  For a more
+involved example, see {@link org.apache.hadoop.hbase.mapred.BuildTableIndex BuildTableIndex}
+or review the <code>org.apache.hadoop.hbase.mapred.TestTableMapReduce</code> unit test.
+</p>
+
+<p>Running mapreduce jobs that have hbase as source or sink, you'll need to
+specify source/sink table and column names in your configuration.</p>
+
+<p>Reading from hbase, the TableInputFormat asks hbase for the list of
+regions and makes a map-per-region or <code>mapred.map.tasks maps</code>,
+whichever is smaller (If your job only has two maps, up mapred.map.tasks
+to a number > number of regions). Maps will run on the adjacent TaskTracker
+if you are running a TaskTracer and RegionServer per node.
+Writing, it may make sense to avoid the reduce step and write yourself back into
+hbase from inside your map. You'd do this when your job does not need the sort
+and collation that mapreduce does on the map emitted data; on insert,
+hbase 'sorts' so there is no point double-sorting (and shuffling data around
+your mapreduce cluster) unless you need to. If you do not need the reduce,
+you might just have your map emit counts of records processed just so the
+framework's report at the end of your job has meaning or set the number of
+reduces to zero and use TableOutputFormat. See example code
+below. If running the reduce step makes sense in your case, its usually better
+to have lots of reducers so load is spread across the hbase cluster.</p>
+
+<p>There is also a new hbase partitioner that will run as many reducers as
+currently existing regions.  The
+{@link org.apache.hadoop.hbase.mapred.HRegionPartitioner} is suitable
+when your table is large and your upload is not such that it will greatly
+alter the number of existing regions when done; other use the default
+partitioner.
+</p>
+
+<h2><a name="examples">Example Code</a></h2>
+<h3>Sample Row Counter</h3>
+<p>See {@link org.apache.hadoop.hbase.mapred.RowCounter}.  You should be able to run
+it by doing: <code>% ./bin/hadoop jar hbase-X.X.X.jar</code>.  This will invoke
+the hbase MapReduce Driver class.  Select 'rowcounter' from the choice of jobs
+offered. You may need to add the hbase conf directory to <code>$HADOOP_HOME/conf/hadoop-env.sh#HADOOP_CLASSPATH</code>
+so the rowcounter gets pointed at the right hbase cluster (or, build a new jar
+with an appropriate hbase-site.xml built into your job jar).
+</p>
+<h3>PerformanceEvaluation</h3>
+<p>See org.apache.hadoop.hbase.PerformanceEvaluation from hbase src/test.  It runs
+a mapreduce job to run concurrent clients reading and writing hbase.
+</p>
+
+*/
+package org.apache.hadoop.hbase.mapred;
\ No newline at end of file

Added: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java?rev=899845&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java (added)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java Sat Jan 16 00:02:10 2010
@@ -0,0 +1,244 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * Test Map/Reduce job over HBase tables. The map/reduce process we're testing
+ * on our tables is simple - take every row in the table, reverse the value of
+ * a particular cell, and write it back to the table.
+ */
+public class TestTableMapReduce extends MultiRegionTable {
+  private static final Log LOG =
+    LogFactory.getLog(TestTableMapReduce.class.getName());
+
+  static final String MULTI_REGION_TABLE_NAME = "mrtest";
+  static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
+  static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");
+
+  private static final byte [][] columns = new byte [][] {
+    INPUT_FAMILY,
+    OUTPUT_FAMILY
+  };
+
+  /** constructor */
+  public TestTableMapReduce() {
+    super(Bytes.toString(INPUT_FAMILY));
+    desc = new HTableDescriptor(MULTI_REGION_TABLE_NAME);
+    desc.addFamily(new HColumnDescriptor(INPUT_FAMILY));
+    desc.addFamily(new HColumnDescriptor(OUTPUT_FAMILY));
+  }
+
+  /**
+   * Pass the given key and processed record reduce
+   */
+  public static class ProcessContentsMapper
+  extends MapReduceBase
+  implements TableMap<ImmutableBytesWritable, Put> {
+    /**
+     * Pass the key, and reversed value to reduce
+     * @param key
+     * @param value
+     * @param output
+     * @param reporter
+     * @throws IOException
+     */
+    public void map(ImmutableBytesWritable key, Result value,
+      OutputCollector<ImmutableBytesWritable, Put> output,
+      Reporter reporter)
+    throws IOException {
+      if (value.size() != 1) {
+        throw new IOException("There should only be one input column");
+      }
+      Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
+        cf = value.getMap();
+      if(!cf.containsKey(INPUT_FAMILY)) {
+        throw new IOException("Wrong input columns. Missing: '" +
+          Bytes.toString(INPUT_FAMILY) + "'.");
+      }
+
+      // Get the original value and reverse it
+
+      String originalValue = new String(value.getValue(INPUT_FAMILY, null),
+        HConstants.UTF8_ENCODING);
+      StringBuilder newValue = new StringBuilder(originalValue);
+      newValue.reverse();
+
+      // Now set the value to be collected
+
+      Put outval = new Put(key.get());
+      outval.add(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
+      output.collect(key, outval);
+    }
+  }
+
+  /**
+   * Test a map/reduce against a multi-region table
+   * @throws IOException
+   */
+  public void testMultiRegionTable() throws IOException {
+    runTestOnTable(new HTable(conf, MULTI_REGION_TABLE_NAME));
+  }
+
+  private void runTestOnTable(HTable table) throws IOException {
+    MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1);
+
+    JobConf jobConf = null;
+    try {
+      LOG.info("Before map/reduce startup");
+      jobConf = new JobConf(conf, TestTableMapReduce.class);
+      jobConf.setJobName("process column contents");
+      jobConf.setNumReduceTasks(1);
+      TableMapReduceUtil.initTableMapJob(Bytes.toString(table.getTableName()),
+        Bytes.toString(INPUT_FAMILY), ProcessContentsMapper.class,
+        ImmutableBytesWritable.class, Put.class, jobConf);
+      TableMapReduceUtil.initTableReduceJob(Bytes.toString(table.getTableName()),
+        IdentityTableReduce.class, jobConf);
+
+      LOG.info("Started " + Bytes.toString(table.getTableName()));
+      JobClient.runJob(jobConf);
+      LOG.info("After map/reduce completion");
+
+      // verify map-reduce results
+      verify(Bytes.toString(table.getTableName()));
+    } finally {
+      mrCluster.shutdown();
+      if (jobConf != null) {
+        FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir")));
+      }
+    }
+  }
+
+  private void verify(String tableName) throws IOException {
+    HTable table = new HTable(conf, tableName);
+    boolean verified = false;
+    long pause = conf.getLong("hbase.client.pause", 5 * 1000);
+    int numRetries = conf.getInt("hbase.client.retries.number", 5);
+    for (int i = 0; i < numRetries; i++) {
+      try {
+        LOG.info("Verification attempt #" + i);
+        verifyAttempt(table);
+        verified = true;
+        break;
+      } catch (NullPointerException e) {
+        // If here, a cell was empty.  Presume its because updates came in
+        // after the scanner had been opened.  Wait a while and retry.
+        LOG.debug("Verification attempt failed: " + e.getMessage());
+      }
+      try {
+        Thread.sleep(pause);
+      } catch (InterruptedException e) {
+        // continue
+      }
+    }
+    assertTrue(verified);
+  }
+
+  /**
+   * Looks at every value of the mapreduce output and verifies that indeed
+   * the values have been reversed.
+   * @param table Table to scan.
+   * @throws IOException
+   * @throws NullPointerException if we failed to find a cell value
+   */
+  private void verifyAttempt(final HTable table) throws IOException, NullPointerException {
+    Scan scan = new Scan();
+    scan.addColumns(columns);
+    ResultScanner scanner = table.getScanner(scan);
+    try {
+      for (Result r : scanner) {
+        if (LOG.isDebugEnabled()) {
+          if (r.size() > 2 ) {
+            throw new IOException("Too many results, expected 2 got " +
+              r.size());
+          }
+        }
+        byte[] firstValue = null;
+        byte[] secondValue = null;
+        int count = 0;
+         for(KeyValue kv : r.list()) {
+          if (count == 0) {
+            firstValue = kv.getValue();
+          }
+          if (count == 1) {
+            secondValue = kv.getValue();
+          }
+          count++;
+          if (count == 2) {
+            break;
+          }
+        }
+
+
+        String first = "";
+        if (firstValue == null) {
+          throw new NullPointerException(Bytes.toString(r.getRow()) +
+            ": first value is null");
+        }
+        first = new String(firstValue, HConstants.UTF8_ENCODING);
+
+        String second = "";
+        if (secondValue == null) {
+          throw new NullPointerException(Bytes.toString(r.getRow()) +
+            ": second value is null");
+        }
+        byte[] secondReversed = new byte[secondValue.length];
+        for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) {
+          secondReversed[i] = secondValue[j];
+        }
+        second = new String(secondReversed, HConstants.UTF8_ENCODING);
+
+        if (first.compareTo(second) != 0) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("second key is not the reverse of first. row=" +
+                r.getRow() + ", first value=" + first + ", second value=" +
+                second);
+          }
+          fail();
+        }
+      }
+    } finally {
+      scanner.close();
+    }
+  }
+}
\ No newline at end of file