You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2010/01/16 01:02:11 UTC
svn commit: r899845 [2/2] - in /hadoop/hbase/trunk: ./
src/java/org/apache/hadoop/hbase/mapred/ src/java/package-info/
src/test/org/apache/hadoop/hbase/mapred/
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java?rev=899845&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java Sat Jan 16 00:02:10 2010
@@ -0,0 +1,106 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.hadoop.mapred.InvalidJobConfException;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Convert Map/Reduce output and write it to an HBase table
+ */
+@Deprecated
+public class TableOutputFormat extends
+FileOutputFormat<ImmutableBytesWritable, Put> {
+
+ /** JobConf parameter that specifies the output table */
+ public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
+ private final Log LOG = LogFactory.getLog(TableOutputFormat.class);
+
+ /**
+ * Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable)
+ * and write to an HBase table
+ */
+ protected static class TableRecordWriter
+ implements RecordWriter<ImmutableBytesWritable, Put> {
+ private HTable m_table;
+
+ /**
+ * Instantiate a TableRecordWriter with the HBase HClient for writing.
+ *
+ * @param table
+ */
+ public TableRecordWriter(HTable table) {
+ m_table = table;
+ }
+
+ public void close(Reporter reporter)
+ throws IOException {
+ m_table.flushCommits();
+ }
+
+ public void write(ImmutableBytesWritable key,
+ Put value) throws IOException {
+ m_table.put(new Put(value));
+ }
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public RecordWriter getRecordWriter(FileSystem ignored,
+ JobConf job, String name, Progressable progress) throws IOException {
+
+ // expecting exactly one path
+
+ String tableName = job.get(OUTPUT_TABLE);
+ HTable table = null;
+ try {
+ table = new HTable(new HBaseConfiguration(job), tableName);
+ } catch(IOException e) {
+ LOG.error(e);
+ throw e;
+ }
+ table.setAutoFlush(false);
+ return new TableRecordWriter(table);
+ }
+
+ @Override
+ public void checkOutputSpecs(FileSystem ignored, JobConf job)
+ throws FileAlreadyExistsException, InvalidJobConfException, IOException {
+
+ String tableName = job.get(OUTPUT_TABLE);
+ if(tableName == null) {
+ throw new IOException("Must specify table name");
+ }
+ }
+}
\ No newline at end of file
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java?rev=899845&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java Sat Jan 16 00:02:10 2010
@@ -0,0 +1,39 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.Reducer;
+
+/**
+ * Write a table, sorting by the input key
+ *
+ * @param <K> key class
+ * @param <V> value class
+ */
+@Deprecated
+@SuppressWarnings("unchecked")
+public interface TableReduce<K extends WritableComparable, V extends Writable>
+extends Reducer<K, V, ImmutableBytesWritable, Put> {
+
+}
\ No newline at end of file
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableSplit.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableSplit.java?rev=899845&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableSplit.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableSplit.java Sat Jan 16 00:02:10 2010
@@ -0,0 +1,113 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.InputSplit;
+
+/**
+ * A table split corresponds to a key range [low, high)
+ */
+@Deprecated
+public class TableSplit implements InputSplit, Comparable<TableSplit> {
+ private byte [] m_tableName;
+ private byte [] m_startRow;
+ private byte [] m_endRow;
+ private String m_regionLocation;
+
+ /** default constructor */
+ public TableSplit() {
+ this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY,
+ HConstants.EMPTY_BYTE_ARRAY, "");
+ }
+
+ /**
+ * Constructor
+ * @param tableName
+ * @param startRow
+ * @param endRow
+ * @param location
+ */
+ public TableSplit(byte [] tableName, byte [] startRow, byte [] endRow,
+ final String location) {
+ this.m_tableName = tableName;
+ this.m_startRow = startRow;
+ this.m_endRow = endRow;
+ this.m_regionLocation = location;
+ }
+
+ /** @return table name */
+ public byte [] getTableName() {
+ return this.m_tableName;
+ }
+
+ /** @return starting row key */
+ public byte [] getStartRow() {
+ return this.m_startRow;
+ }
+
+ /** @return end row key */
+ public byte [] getEndRow() {
+ return this.m_endRow;
+ }
+
+ /** @return the region's hostname */
+ public String getRegionLocation() {
+ return this.m_regionLocation;
+ }
+
+ public String[] getLocations() {
+ return new String[] {this.m_regionLocation};
+ }
+
+ public long getLength() {
+ // Not clear how to obtain this... seems to be used only for sorting splits
+ return 0;
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ this.m_tableName = Bytes.readByteArray(in);
+ this.m_startRow = Bytes.readByteArray(in);
+ this.m_endRow = Bytes.readByteArray(in);
+ this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in));
+ }
+
+ public void write(DataOutput out) throws IOException {
+ Bytes.writeByteArray(out, this.m_tableName);
+ Bytes.writeByteArray(out, this.m_startRow);
+ Bytes.writeByteArray(out, this.m_endRow);
+ Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation));
+ }
+
+ @Override
+ public String toString() {
+ return m_regionLocation + ":" +
+ Bytes.toStringBinary(m_startRow) + "," + Bytes.toStringBinary(m_endRow);
+ }
+
+ public int compareTo(TableSplit o) {
+ return Bytes.compareTo(getStartRow(), o.getStartRow());
+ }
+}
\ No newline at end of file
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/package-info.java?rev=899845&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/package-info.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/package-info.java Sat Jan 16 00:02:10 2010
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+Provides HBase <a href="http://wiki.apache.org/hadoop/HadoopMapReduce">MapReduce</a>
+Input/OutputFormats, a table indexing MapReduce job, and utility
+
+<h2>Table of Contents</h2>
+<ul>
+<li><a href="#classpath">HBase, MapReduce and the CLASSPATH</a></li>
+<li><a href="#sink">HBase as MapReduce job data source and sink</a></li>
+<li><a href="#examples">Example Code</a></li>
+</ul>
+
+<h2><a name="classpath">HBase, MapReduce and the CLASSPATH</a></h2>
+
+<p>MapReduce jobs deployed to a MapReduce cluster do not by default have access
+to the HBase configuration under <code>$HBASE_CONF_DIR</code> nor to HBase classes.
+You could add <code>hbase-site.xml</code> to $HADOOP_HOME/conf and add
+<code>hbase-X.X.X.jar</code> to the <code>$HADOOP_HOME/lib</code> and copy these
+changes across your cluster but the cleanest means of adding hbase configuration
+and classes to the cluster <code>CLASSPATH</code> is by uncommenting
+<code>HADOOP_CLASSPATH</code> in <code>$HADOOP_HOME/conf/hadoop-env.sh</code>
+and adding the path to the hbase jar and <code>$HBASE_CONF_DIR</code> directory.
+Then copy the amended configuration around the cluster.
+You'll probably need to restart the MapReduce cluster if you want it to notice
+the new configuration.
+</p>
+
+<p>For example, here is how you would amend <code>hadoop-env.sh</code> adding the
+built hbase jar, hbase conf, and the <code>PerformanceEvaluation</code> class from
+the built hbase test jar to the hadoop <code>CLASSPATH<code>:
+
+<blockquote><pre># Extra Java CLASSPATH elements. Optional.
+# export HADOOP_CLASSPATH=
+export HADOOP_CLASSPATH=$HBASE_HOME/build/test:$HBASE_HOME/build/hbase-X.X.X.jar:$HBASE_HOME/build/hbase-X.X.X-test.jar:$HBASE_HOME/conf</pre></blockquote>
+
+<p>Expand <code>$HBASE_HOME</code> in the above appropriately to suit your
+local environment.</p>
+
+<p>After copying the above change around your cluster, this is how you would run
+the PerformanceEvaluation MR job to put up 4 clients (Presumes a ready mapreduce
+cluster):
+
+<blockquote><pre>$HADOOP_HOME/bin/hadoop org.apache.hadoop.hbase.PerformanceEvaluation sequentialWrite 4</pre></blockquote>
+
+The PerformanceEvaluation class wil be found on the CLASSPATH because you
+added <code>$HBASE_HOME/build/test</code> to HADOOP_CLASSPATH
+</p>
+
+<p>Another possibility, if for example you do not have access to hadoop-env.sh or
+are unable to restart the hadoop cluster, is bundling the hbase jar into a mapreduce
+job jar adding it and its dependencies under the job jar <code>lib/</code>
+directory and the hbase conf into a job jar <code>conf/</code> directory.
+</a>
+
+<h2><a name="sink">HBase as MapReduce job data source and sink</a></h2>
+
+<p>HBase can be used as a data source, {@link org.apache.hadoop.hbase.mapred.TableInputFormat TableInputFormat},
+and data sink, {@link org.apache.hadoop.hbase.mapred.TableOutputFormat TableOutputFormat}, for MapReduce jobs.
+Writing MapReduce jobs that read or write HBase, you'll probably want to subclass
+{@link org.apache.hadoop.hbase.mapred.TableMap TableMap} and/or
+{@link org.apache.hadoop.hbase.mapred.TableReduce TableReduce}. See the do-nothing
+pass-through classes {@link org.apache.hadoop.hbase.mapred.IdentityTableMap IdentityTableMap} and
+{@link org.apache.hadoop.hbase.mapred.IdentityTableReduce IdentityTableReduce} for basic usage. For a more
+involved example, see {@link org.apache.hadoop.hbase.mapred.BuildTableIndex BuildTableIndex}
+or review the <code>org.apache.hadoop.hbase.mapred.TestTableMapReduce</code> unit test.
+</p>
+
+<p>Running mapreduce jobs that have hbase as source or sink, you'll need to
+specify source/sink table and column names in your configuration.</p>
+
+<p>Reading from hbase, the TableInputFormat asks hbase for the list of
+regions and makes a map-per-region or <code>mapred.map.tasks maps</code>,
+whichever is smaller (If your job only has two maps, up mapred.map.tasks
+to a number > number of regions). Maps will run on the adjacent TaskTracker
+if you are running a TaskTracer and RegionServer per node.
+Writing, it may make sense to avoid the reduce step and write yourself back into
+hbase from inside your map. You'd do this when your job does not need the sort
+and collation that mapreduce does on the map emitted data; on insert,
+hbase 'sorts' so there is no point double-sorting (and shuffling data around
+your mapreduce cluster) unless you need to. If you do not need the reduce,
+you might just have your map emit counts of records processed just so the
+framework's report at the end of your job has meaning or set the number of
+reduces to zero and use TableOutputFormat. See example code
+below. If running the reduce step makes sense in your case, its usually better
+to have lots of reducers so load is spread across the hbase cluster.</p>
+
+<p>There is also a new hbase partitioner that will run as many reducers as
+currently existing regions. The
+{@link org.apache.hadoop.hbase.mapred.HRegionPartitioner} is suitable
+when your table is large and your upload is not such that it will greatly
+alter the number of existing regions when done; other use the default
+partitioner.
+</p>
+
+<h2><a name="examples">Example Code</a></h2>
+<h3>Sample Row Counter</h3>
+<p>See {@link org.apache.hadoop.hbase.mapred.RowCounter}. You should be able to run
+it by doing: <code>% ./bin/hadoop jar hbase-X.X.X.jar</code>. This will invoke
+the hbase MapReduce Driver class. Select 'rowcounter' from the choice of jobs
+offered. You may need to add the hbase conf directory to <code>$HADOOP_HOME/conf/hadoop-env.sh#HADOOP_CLASSPATH</code>
+so the rowcounter gets pointed at the right hbase cluster (or, build a new jar
+with an appropriate hbase-site.xml built into your job jar).
+</p>
+<h3>PerformanceEvaluation</h3>
+<p>See org.apache.hadoop.hbase.PerformanceEvaluation from hbase src/test. It runs
+a mapreduce job to run concurrent clients reading and writing hbase.
+</p>
+
+*/
+package org.apache.hadoop.hbase.mapred;
\ No newline at end of file
Added: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java?rev=899845&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java (added)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java Sat Jan 16 00:02:10 2010
@@ -0,0 +1,244 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * Test Map/Reduce job over HBase tables. The map/reduce process we're testing
+ * on our tables is simple - take every row in the table, reverse the value of
+ * a particular cell, and write it back to the table.
+ */
+public class TestTableMapReduce extends MultiRegionTable {
+ private static final Log LOG =
+ LogFactory.getLog(TestTableMapReduce.class.getName());
+
+ static final String MULTI_REGION_TABLE_NAME = "mrtest";
+ static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
+ static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");
+
+ private static final byte [][] columns = new byte [][] {
+ INPUT_FAMILY,
+ OUTPUT_FAMILY
+ };
+
+ /** constructor */
+ public TestTableMapReduce() {
+ super(Bytes.toString(INPUT_FAMILY));
+ desc = new HTableDescriptor(MULTI_REGION_TABLE_NAME);
+ desc.addFamily(new HColumnDescriptor(INPUT_FAMILY));
+ desc.addFamily(new HColumnDescriptor(OUTPUT_FAMILY));
+ }
+
+ /**
+ * Pass the given key and processed record reduce
+ */
+ public static class ProcessContentsMapper
+ extends MapReduceBase
+ implements TableMap<ImmutableBytesWritable, Put> {
+ /**
+ * Pass the key, and reversed value to reduce
+ * @param key
+ * @param value
+ * @param output
+ * @param reporter
+ * @throws IOException
+ */
+ public void map(ImmutableBytesWritable key, Result value,
+ OutputCollector<ImmutableBytesWritable, Put> output,
+ Reporter reporter)
+ throws IOException {
+ if (value.size() != 1) {
+ throw new IOException("There should only be one input column");
+ }
+ Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
+ cf = value.getMap();
+ if(!cf.containsKey(INPUT_FAMILY)) {
+ throw new IOException("Wrong input columns. Missing: '" +
+ Bytes.toString(INPUT_FAMILY) + "'.");
+ }
+
+ // Get the original value and reverse it
+
+ String originalValue = new String(value.getValue(INPUT_FAMILY, null),
+ HConstants.UTF8_ENCODING);
+ StringBuilder newValue = new StringBuilder(originalValue);
+ newValue.reverse();
+
+ // Now set the value to be collected
+
+ Put outval = new Put(key.get());
+ outval.add(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
+ output.collect(key, outval);
+ }
+ }
+
+ /**
+ * Test a map/reduce against a multi-region table
+ * @throws IOException
+ */
+ public void testMultiRegionTable() throws IOException {
+ runTestOnTable(new HTable(conf, MULTI_REGION_TABLE_NAME));
+ }
+
+ private void runTestOnTable(HTable table) throws IOException {
+ MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1);
+
+ JobConf jobConf = null;
+ try {
+ LOG.info("Before map/reduce startup");
+ jobConf = new JobConf(conf, TestTableMapReduce.class);
+ jobConf.setJobName("process column contents");
+ jobConf.setNumReduceTasks(1);
+ TableMapReduceUtil.initTableMapJob(Bytes.toString(table.getTableName()),
+ Bytes.toString(INPUT_FAMILY), ProcessContentsMapper.class,
+ ImmutableBytesWritable.class, Put.class, jobConf);
+ TableMapReduceUtil.initTableReduceJob(Bytes.toString(table.getTableName()),
+ IdentityTableReduce.class, jobConf);
+
+ LOG.info("Started " + Bytes.toString(table.getTableName()));
+ JobClient.runJob(jobConf);
+ LOG.info("After map/reduce completion");
+
+ // verify map-reduce results
+ verify(Bytes.toString(table.getTableName()));
+ } finally {
+ mrCluster.shutdown();
+ if (jobConf != null) {
+ FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir")));
+ }
+ }
+ }
+
+ private void verify(String tableName) throws IOException {
+ HTable table = new HTable(conf, tableName);
+ boolean verified = false;
+ long pause = conf.getLong("hbase.client.pause", 5 * 1000);
+ int numRetries = conf.getInt("hbase.client.retries.number", 5);
+ for (int i = 0; i < numRetries; i++) {
+ try {
+ LOG.info("Verification attempt #" + i);
+ verifyAttempt(table);
+ verified = true;
+ break;
+ } catch (NullPointerException e) {
+ // If here, a cell was empty. Presume its because updates came in
+ // after the scanner had been opened. Wait a while and retry.
+ LOG.debug("Verification attempt failed: " + e.getMessage());
+ }
+ try {
+ Thread.sleep(pause);
+ } catch (InterruptedException e) {
+ // continue
+ }
+ }
+ assertTrue(verified);
+ }
+
+ /**
+ * Looks at every value of the mapreduce output and verifies that indeed
+ * the values have been reversed.
+ * @param table Table to scan.
+ * @throws IOException
+ * @throws NullPointerException if we failed to find a cell value
+ */
+ private void verifyAttempt(final HTable table) throws IOException, NullPointerException {
+ Scan scan = new Scan();
+ scan.addColumns(columns);
+ ResultScanner scanner = table.getScanner(scan);
+ try {
+ for (Result r : scanner) {
+ if (LOG.isDebugEnabled()) {
+ if (r.size() > 2 ) {
+ throw new IOException("Too many results, expected 2 got " +
+ r.size());
+ }
+ }
+ byte[] firstValue = null;
+ byte[] secondValue = null;
+ int count = 0;
+ for(KeyValue kv : r.list()) {
+ if (count == 0) {
+ firstValue = kv.getValue();
+ }
+ if (count == 1) {
+ secondValue = kv.getValue();
+ }
+ count++;
+ if (count == 2) {
+ break;
+ }
+ }
+
+
+ String first = "";
+ if (firstValue == null) {
+ throw new NullPointerException(Bytes.toString(r.getRow()) +
+ ": first value is null");
+ }
+ first = new String(firstValue, HConstants.UTF8_ENCODING);
+
+ String second = "";
+ if (secondValue == null) {
+ throw new NullPointerException(Bytes.toString(r.getRow()) +
+ ": second value is null");
+ }
+ byte[] secondReversed = new byte[secondValue.length];
+ for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) {
+ secondReversed[i] = secondValue[j];
+ }
+ second = new String(secondReversed, HConstants.UTF8_ENCODING);
+
+ if (first.compareTo(second) != 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("second key is not the reverse of first. row=" +
+ r.getRow() + ", first value=" + first + ", second value=" +
+ second);
+ }
+ fail();
+ }
+ }
+ } finally {
+ scanner.close();
+ }
+ }
+}
\ No newline at end of file