You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/08/27 05:33:46 UTC

[46/50] [abbrv] hbase git commit: HBASE-18640 Move mapreduce out of hbase-server into separate module.

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
new file mode 100644
index 0000000..9811a97
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
@@ -0,0 +1,313 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * A Base for {@link TableInputFormat}s. Receives a {@link Table}, a
+ * byte[] of input columns and optionally a {@link Filter}.
+ * Subclasses may use other TableRecordReader implementations.
+ *
+ * Subclasses MUST ensure initializeTable(Connection, TableName) is called for an instance to
+ * function properly. Each of the entry points to this class used by the MapReduce framework,
+ * {@link #getRecordReader(InputSplit, JobConf, Reporter)} and {@link #getSplits(JobConf, int)},
+ * will call {@link #initialize(JobConf)} as a convenient centralized location to handle
+ * retrieving the necessary configuration information. If your subclass overrides either of these
+ * methods, either call the parent version or call initialize yourself.
+ *
+ * <p>
+ * An example of a subclass:
+ * <pre>
+ *   class ExampleTIF extends TableInputFormatBase {
+ *
+ *     {@literal @}Override
+ *     protected void initialize(JobConf context) throws IOException {
+ *       // We are responsible for the lifecycle of this connection until we hand it over in
+ *       // initializeTable.
+ *       Connection connection =
+ *          ConnectionFactory.createConnection(HBaseConfiguration.create(job));
+ *       TableName tableName = TableName.valueOf("exampleTable");
+ *       // mandatory. once passed here, TableInputFormatBase will handle closing the connection.
+ *       initializeTable(connection, tableName);
+ *       byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
+ *         Bytes.toBytes("columnB") };
+ *       // mandatory
+ *       setInputColumns(inputColumns);
+ *       // optional, by default we'll get everything for the given columns.
+ *       Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
+ *       setRowFilter(exampleFilter);
+ *     }
+ *   }
+ * </pre>
+ */
+
+@InterfaceAudience.Public
+public abstract class TableInputFormatBase
+implements InputFormat<ImmutableBytesWritable, Result> {
+  private static final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
+  private byte [][] inputColumns;
+  private Table table;
+  private RegionLocator regionLocator;
+  private Connection connection;
+  private TableRecordReader tableRecordReader;
+  private Filter rowFilter;
+
+  private static final String NOT_INITIALIZED = "The input format instance has not been properly " +
+      "initialized. Ensure you call initializeTable either in your constructor or initialize " +
+      "method";
+  private static final String INITIALIZATION_ERROR = "Cannot create a record reader because of a" +
+            " previous error. Please look at the previous logs lines from" +
+            " the task's full log for more details.";
+
+  /**
+   * Builds a TableRecordReader. If no TableRecordReader was provided, uses
+   * the default.
+   *
+   * @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit,
+   *      JobConf, Reporter)
+   */
+  public RecordReader<ImmutableBytesWritable, Result> getRecordReader(
+      InputSplit split, JobConf job, Reporter reporter)
+  throws IOException {
+    // In case a subclass uses the deprecated approach or calls initializeTable directly
+    if (table == null) {
+      initialize(job);
+    }
+    // null check in case our child overrides getTable to not throw.
+    try {
+      if (getTable() == null) {
+        // initialize() must not have been implemented in the subclass.
+        throw new IOException(INITIALIZATION_ERROR);
+      }
+    } catch (IllegalStateException exception) {
+      throw new IOException(INITIALIZATION_ERROR, exception);
+    }
+
+    TableSplit tSplit = (TableSplit) split;
+    // if no table record reader was provided use default
+    final TableRecordReader trr = this.tableRecordReader == null ? new TableRecordReader() :
+        this.tableRecordReader;
+    trr.setStartRow(tSplit.getStartRow());
+    trr.setEndRow(tSplit.getEndRow());
+    trr.setHTable(this.table);
+    trr.setInputColumns(this.inputColumns);
+    trr.setRowFilter(this.rowFilter);
+    trr.init();
+    return new RecordReader<ImmutableBytesWritable, Result>() {
+
+      @Override
+      public void close() throws IOException {
+        trr.close();
+        closeTable();
+      }
+
+      @Override
+      public ImmutableBytesWritable createKey() {
+        return trr.createKey();
+      }
+
+      @Override
+      public Result createValue() {
+        return trr.createValue();
+      }
+
+      @Override
+      public long getPos() throws IOException {
+        return trr.getPos();
+      }
+
+      @Override
+      public float getProgress() throws IOException {
+        return trr.getProgress();
+      }
+
+      @Override
+      public boolean next(ImmutableBytesWritable key, Result value) throws IOException {
+        return trr.next(key, value);
+      }
+    };
+  }
+
+  /**
+   * Calculates the splits that will serve as input for the map tasks.
+   *
+   * Splits are created in number equal to the smallest between numSplits and
+   * the number of {@link org.apache.hadoop.hbase.regionserver.HRegion}s in the table.
+   * If the number of splits is smaller than the number of
+   * {@link org.apache.hadoop.hbase.regionserver.HRegion}s then splits are spanned across
+   * multiple {@link org.apache.hadoop.hbase.regionserver.HRegion}s
+   * and are grouped the most evenly possible. In the
+   * case splits are uneven the bigger splits are placed first in the
+   * {@link InputSplit} array.
+   *
+   * @param job the map task {@link JobConf}
+   * @param numSplits a hint to calculate the number of splits (mapred.map.tasks).
+   *
+   * @return the input splits
+   *
+   * @see org.apache.hadoop.mapred.InputFormat#getSplits(org.apache.hadoop.mapred.JobConf, int)
+   */
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    if (this.table == null) {
+      initialize(job);
+    }
+    // null check in case our child overrides getTable to not throw.
+    try {
+      if (getTable() == null) {
+        // initialize() must not have been implemented in the subclass.
+        throw new IOException(INITIALIZATION_ERROR);
+      }
+    } catch (IllegalStateException exception) {
+      throw new IOException(INITIALIZATION_ERROR, exception);
+    }
+
+    byte [][] startKeys = this.regionLocator.getStartKeys();
+    if (startKeys == null || startKeys.length == 0) {
+      throw new IOException("Expecting at least one region");
+    }
+    if (this.inputColumns == null || this.inputColumns.length == 0) {
+      throw new IOException("Expecting at least one column");
+    }
+    int realNumSplits = numSplits > startKeys.length? startKeys.length:
+      numSplits;
+    InputSplit[] splits = new InputSplit[realNumSplits];
+    int middle = startKeys.length / realNumSplits;
+    int startPos = 0;
+    for (int i = 0; i < realNumSplits; i++) {
+      int lastPos = startPos + middle;
+      lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos;
+      String regionLocation = regionLocator.getRegionLocation(startKeys[startPos]).
+        getHostname();
+      splits[i] = new TableSplit(this.table.getName(),
+        startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos]:
+          HConstants.EMPTY_START_ROW, regionLocation);
+      LOG.info("split: " + i + "->" + splits[i]);
+      startPos = lastPos;
+    }
+    return splits;
+  }
+
+  /**
+   * Allows subclasses to initialize the table information.
+   *
+   * @param connection  The Connection to the HBase cluster. MUST be unmanaged. We will close.
+   * @param tableName  The {@link TableName} of the table to process.
+   * @throws IOException
+   */
+  protected void initializeTable(Connection connection, TableName tableName) throws IOException {
+    if (this.table != null || this.connection != null) {
+      LOG.warn("initializeTable called multiple times. Overwriting connection and table " +
+          "reference; TableInputFormatBase will not close these old references when done.");
+    }
+    this.table = connection.getTable(tableName);
+    this.regionLocator = connection.getRegionLocator(tableName);
+    this.connection = connection;
+  }
+
+  /**
+   * @param inputColumns to be passed in {@link Result} to the map task.
+   */
+  protected void setInputColumns(byte [][] inputColumns) {
+    this.inputColumns = inputColumns;
+  }
+
+  /**
+   * Allows subclasses to get the {@link Table}.
+   */
+  protected Table getTable() {
+    if (table == null) {
+      throw new IllegalStateException(NOT_INITIALIZED);
+    }
+    return this.table;
+  }
+
+  /**
+   * Allows subclasses to set the {@link TableRecordReader}.
+   *
+   * @param tableRecordReader
+   *                to provide other {@link TableRecordReader} implementations.
+   */
+  protected void setTableRecordReader(TableRecordReader tableRecordReader) {
+    this.tableRecordReader = tableRecordReader;
+  }
+
+  /**
+   * Allows subclasses to set the {@link Filter} to be used.
+   *
+   * @param rowFilter
+   */
+  protected void setRowFilter(Filter rowFilter) {
+    this.rowFilter = rowFilter;
+  }
+
+  /**
+   * Handle subclass specific set up.
+   * Each of the entry points used by the MapReduce framework,
+   * {@link #getRecordReader(InputSplit, JobConf, Reporter)} and {@link #getSplits(JobConf, int)},
+   * will call {@link #initialize(JobConf)} as a convenient centralized location to handle
+   * retrieving the necessary configuration information and calling
+   * {@link #initializeTable(Connection, TableName)}.
+   *
+   * Subclasses should implement their initialize call such that it is safe to call multiple times.
+   * The current TableInputFormatBase implementation relies on a non-null table reference to decide
+   * if an initialize call is needed, but this behavior may change in the future. In particular,
+   * it is critical that initializeTable not be called multiple times since this will leak
+   * Connection instances.
+   *
+   */
+  protected void initialize(JobConf job) throws IOException {
+  }
+
+  /**
+   * Close the Table and related objects that were initialized via
+   * {@link #initializeTable(Connection, TableName)}.
+   *
+   * @throws IOException
+   */
+  protected void closeTable() throws IOException {
+    close(table, connection);
+    table = null;
+    connection = null;
+  }
+
+  private void close(Closeable... closables) throws IOException {
+    for (Closeable c : closables) {
+      if(c != null) { c.close(); }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableMap.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableMap.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableMap.java
new file mode 100644
index 0000000..a9f1e61
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableMap.java
@@ -0,0 +1,38 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.Mapper;
+
+/**
+ * Scan an HBase table to sort by a specified sort column.
+ * If the column does not exist, the record is not passed to Reduce.
+ *
+ * @param <K> WritableComparable key class
+ * @param <V> Writable value class
+ */
+@InterfaceAudience.Public
+public interface TableMap<K extends WritableComparable<? super K>, V>
+extends Mapper<ImmutableBytesWritable, Result, K, V> {
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
new file mode 100644
index 0000000..63ec418
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
@@ -0,0 +1,376 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
+import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.token.TokenUtil;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Utility for {@link TableMap} and {@link TableReduce}
+ */
+@InterfaceAudience.Public
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class TableMapReduceUtil {
+
+  /**
+   * Use this before submitting a TableMap job. It will
+   * appropriately set up the JobConf.
+   *
+   * @param table  The table name to read from.
+   * @param columns  The columns to scan.
+   * @param mapper  The mapper class to use.
+   * @param outputKeyClass  The class of the output key.
+   * @param outputValueClass  The class of the output value.
+   * @param job  The current job configuration to adjust.
+   */
+  public static void initTableMapJob(String table, String columns,
+    Class<? extends TableMap> mapper,
+    Class<?> outputKeyClass,
+    Class<?> outputValueClass, JobConf job) {
+    initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job,
+      true, TableInputFormat.class);
+  }
+
+  public static void initTableMapJob(String table, String columns,
+    Class<? extends TableMap> mapper,
+    Class<?> outputKeyClass,
+    Class<?> outputValueClass, JobConf job, boolean addDependencyJars) {
+    initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job,
+      addDependencyJars, TableInputFormat.class);
+  }
+
+  /**
+   * Use this before submitting a TableMap job. It will
+   * appropriately set up the JobConf.
+   *
+   * @param table  The table name to read from.
+   * @param columns  The columns to scan.
+   * @param mapper  The mapper class to use.
+   * @param outputKeyClass  The class of the output key.
+   * @param outputValueClass  The class of the output value.
+   * @param job  The current job configuration to adjust.
+   * @param addDependencyJars upload HBase jars and jars for any of the configured
+   *           job classes via the distributed cache (tmpjars).
+   */
+  public static void initTableMapJob(String table, String columns,
+    Class<? extends TableMap> mapper,
+    Class<?> outputKeyClass,
+    Class<?> outputValueClass, JobConf job, boolean addDependencyJars,
+    Class<? extends InputFormat> inputFormat) {
+
+    job.setInputFormat(inputFormat);
+    job.setMapOutputValueClass(outputValueClass);
+    job.setMapOutputKeyClass(outputKeyClass);
+    job.setMapperClass(mapper);
+    job.setStrings("io.serializations", job.get("io.serializations"),
+        MutationSerialization.class.getName(), ResultSerialization.class.getName());
+    FileInputFormat.addInputPaths(job, table);
+    job.set(TableInputFormat.COLUMN_LIST, columns);
+    if (addDependencyJars) {
+      try {
+        addDependencyJars(job);
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
+    try {
+      initCredentials(job);
+    } catch (IOException ioe) {
+      // just spit out the stack trace?  really?
+      ioe.printStackTrace();
+    }
+  }
+
+  /**
+   * Sets up the job for reading from one or more multiple table snapshots, with one or more scans
+   * per snapshot.
+   * It bypasses hbase servers and read directly from snapshot files.
+   *
+   * @param snapshotScans     map of snapshot name to scans on that snapshot.
+   * @param mapper            The mapper class to use.
+   * @param outputKeyClass    The class of the output key.
+   * @param outputValueClass  The class of the output value.
+   * @param job               The current job to adjust.  Make sure the passed job is
+   *                          carrying all necessary HBase configuration.
+   * @param addDependencyJars upload HBase jars and jars for any of the configured
+   *                          job classes via the distributed cache (tmpjars).
+   */
+  public static void initMultiTableSnapshotMapperJob(Map<String, Collection<Scan>> snapshotScans,
+      Class<? extends TableMap> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
+      JobConf job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException {
+    MultiTableSnapshotInputFormat.setInput(job, snapshotScans, tmpRestoreDir);
+
+    job.setInputFormat(MultiTableSnapshotInputFormat.class);
+    if (outputValueClass != null) {
+      job.setMapOutputValueClass(outputValueClass);
+    }
+    if (outputKeyClass != null) {
+      job.setMapOutputKeyClass(outputKeyClass);
+    }
+    job.setMapperClass(mapper);
+    if (addDependencyJars) {
+      addDependencyJars(job);
+    }
+
+    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job);
+  }
+
+  /**
+   * Sets up the job for reading from a table snapshot. It bypasses hbase servers
+   * and read directly from snapshot files.
+   *
+   * @param snapshotName The name of the snapshot (of a table) to read from.
+   * @param columns  The columns to scan.
+   * @param mapper  The mapper class to use.
+   * @param outputKeyClass  The class of the output key.
+   * @param outputValueClass  The class of the output value.
+   * @param job  The current job to adjust.  Make sure the passed job is
+   * carrying all necessary HBase configuration.
+   * @param addDependencyJars upload HBase jars and jars for any of the configured
+   *           job classes via the distributed cache (tmpjars).
+   * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should
+   * have write permissions to this directory, and this should not be a subdirectory of rootdir.
+   * After the job is finished, restore directory can be deleted.
+   * @throws IOException When setting up the details fails.
+   * @see TableSnapshotInputFormat
+   */
+  public static void initTableSnapshotMapJob(String snapshotName, String columns,
+      Class<? extends TableMap> mapper,
+      Class<?> outputKeyClass,
+      Class<?> outputValueClass, JobConf job,
+      boolean addDependencyJars, Path tmpRestoreDir)
+  throws IOException {
+    TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
+    initTableMapJob(snapshotName, columns, mapper, outputKeyClass, outputValueClass, job,
+      addDependencyJars, TableSnapshotInputFormat.class);
+    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job);
+  }
+
+  /**
+   * Use this before submitting a TableReduce job. It will
+   * appropriately set up the JobConf.
+   *
+   * @param table  The output table.
+   * @param reducer  The reducer class to use.
+   * @param job  The current job configuration to adjust.
+   * @throws IOException When determining the region count fails.
+   */
+  public static void initTableReduceJob(String table,
+    Class<? extends TableReduce> reducer, JobConf job)
+  throws IOException {
+    initTableReduceJob(table, reducer, job, null);
+  }
+
+  /**
+   * Use this before submitting a TableReduce job. It will
+   * appropriately set up the JobConf.
+   *
+   * @param table  The output table.
+   * @param reducer  The reducer class to use.
+   * @param job  The current job configuration to adjust.
+   * @param partitioner  Partitioner to use. Pass <code>null</code> to use
+   * default partitioner.
+   * @throws IOException When determining the region count fails.
+   */
+  public static void initTableReduceJob(String table,
+    Class<? extends TableReduce> reducer, JobConf job, Class partitioner)
+  throws IOException {
+    initTableReduceJob(table, reducer, job, partitioner, true);
+  }
+
+  /**
+   * Use this before submitting a TableReduce job. It will
+   * appropriately set up the JobConf.
+   *
+   * @param table  The output table.
+   * @param reducer  The reducer class to use.
+   * @param job  The current job configuration to adjust.
+   * @param partitioner  Partitioner to use. Pass <code>null</code> to use
+   * default partitioner.
+   * @param addDependencyJars upload HBase jars and jars for any of the configured
+   *           job classes via the distributed cache (tmpjars).
+   * @throws IOException When determining the region count fails.
+   */
+  public static void initTableReduceJob(String table,
+    Class<? extends TableReduce> reducer, JobConf job, Class partitioner,
+    boolean addDependencyJars) throws IOException {
+    job.setOutputFormat(TableOutputFormat.class);
+    job.setReducerClass(reducer);
+    job.set(TableOutputFormat.OUTPUT_TABLE, table);
+    job.setOutputKeyClass(ImmutableBytesWritable.class);
+    job.setOutputValueClass(Put.class);
+    job.setStrings("io.serializations", job.get("io.serializations"),
+        MutationSerialization.class.getName(), ResultSerialization.class.getName());
+    if (partitioner == HRegionPartitioner.class) {
+      job.setPartitionerClass(HRegionPartitioner.class);
+      int regions =
+        MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
+      if (job.getNumReduceTasks() > regions) {
+        job.setNumReduceTasks(regions);
+      }
+    } else if (partitioner != null) {
+      job.setPartitionerClass(partitioner);
+    }
+    if (addDependencyJars) {
+      addDependencyJars(job);
+    }
+    initCredentials(job);
+  }
+
+  public static void initCredentials(JobConf job) throws IOException {
+    UserProvider userProvider = UserProvider.instantiate(job);
+    if (userProvider.isHadoopSecurityEnabled()) {
+      // propagate delegation related props from launcher job to MR job
+      if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
+        job.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
+      }
+    }
+
+    if (userProvider.isHBaseSecurityEnabled()) {
+      Connection conn = ConnectionFactory.createConnection(job);
+      try {
+        // login the server principal (if using secure Hadoop)
+        User user = userProvider.getCurrent();
+        TokenUtil.addTokenForJob(conn, job, user);
+      } catch (InterruptedException ie) {
+        ie.printStackTrace();
+        Thread.currentThread().interrupt();
+      } finally {
+        conn.close();
+      }
+    }
+  }
+
+  /**
+   * Ensures that the given number of reduce tasks for the given job
+   * configuration does not exceed the number of regions for the given table.
+   *
+   * @param table  The table to get the region count for.
+   * @param job  The current job configuration to adjust.
+   * @throws IOException When retrieving the table details fails.
+   */
+  // Used by tests.
+  public static void limitNumReduceTasks(String table, JobConf job)
+  throws IOException {
+    int regions =
+      MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
+    if (job.getNumReduceTasks() > regions)
+      job.setNumReduceTasks(regions);
+  }
+
+  /**
+   * Ensures that the given number of map tasks for the given job
+   * configuration does not exceed the number of regions for the given table.
+   *
+   * @param table  The table to get the region count for.
+   * @param job  The current job configuration to adjust.
+   * @throws IOException When retrieving the table details fails.
+   */
+  // Used by tests.
+  public static void limitNumMapTasks(String table, JobConf job)
+  throws IOException {
+    int regions =
+      MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
+    if (job.getNumMapTasks() > regions)
+      job.setNumMapTasks(regions);
+  }
+
+  /**
+   * Sets the number of reduce tasks for the given job configuration to the
+   * number of regions the given table has.
+   *
+   * @param table  The table to get the region count for.
+   * @param job  The current job configuration to adjust.
+   * @throws IOException When retrieving the table details fails.
+   */
+  public static void setNumReduceTasks(String table, JobConf job)
+  throws IOException {
+    job.setNumReduceTasks(MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job),
+      TableName.valueOf(table)));
+  }
+
+  /**
+   * Sets the number of map tasks for the given job configuration to the
+   * number of regions the given table has.
+   *
+   * @param table  The table to get the region count for.
+   * @param job  The current job configuration to adjust.
+   * @throws IOException When retrieving the table details fails.
+   */
+  public static void setNumMapTasks(String table, JobConf job)
+  throws IOException {
+    job.setNumMapTasks(MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job),
+      TableName.valueOf(table)));
+  }
+
+  /**
+   * Sets the number of rows to return and cache with each scanner iteration.
+   * Higher caching values will enable faster mapreduce jobs at the expense of
+   * requiring more heap to contain the cached rows.
+   *
+   * @param job The current job configuration to adjust.
+   * @param batchSize The number of rows to return in batch with each scanner
+   * iteration.
+   */
+  public static void setScannerCaching(JobConf job, int batchSize) {
+    job.setInt("hbase.client.scanner.caching", batchSize);
+  }
+
+  /**
+   * @see org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#addDependencyJars(org.apache.hadoop.mapreduce.Job)
+   */
+  public static void addDependencyJars(JobConf job) throws IOException {
+    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addHBaseDependencyJars(job);
+    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJarsForClasses(
+      job,
+      // when making changes here, consider also mapreduce.TableMapReduceUtil
+      // pull job classes
+      job.getMapOutputKeyClass(),
+      job.getMapOutputValueClass(),
+      job.getOutputKeyClass(),
+      job.getOutputValueClass(),
+      job.getPartitionerClass(),
+      job.getClass("mapred.input.format.class", TextInputFormat.class, InputFormat.class),
+      job.getClass("mapred.output.format.class", TextOutputFormat.class, OutputFormat.class),
+      job.getCombinerClass());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java
new file mode 100644
index 0000000..06b28ed
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java
@@ -0,0 +1,134 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.InvalidJobConfException;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Convert Map/Reduce output and write it to an HBase table
+ */
+@InterfaceAudience.Public
+public class TableOutputFormat extends FileOutputFormat<ImmutableBytesWritable, Put> {
+
+  /** JobConf parameter that specifies the output table */
+  public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
+
+  /**
+   * Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable)
+   * and write to an HBase table.
+   */
+  protected static class TableRecordWriter implements RecordWriter<ImmutableBytesWritable, Put> {
+    private BufferedMutator m_mutator;
+    private Connection conn;
+
+
+    /**
+     * Instantiate a TableRecordWriter with the HBase HClient for writing.
+     *
+     * @deprecated Please use {@code #TableRecordWriter(JobConf)}  This version does not clean up
+     * connections and will leak connections (removed in 2.0)
+     */
+    @Deprecated
+    public TableRecordWriter(final BufferedMutator mutator) throws IOException {
+      this.m_mutator = mutator;
+      this.conn = null;
+    }
+
+    /**
+     * Instantiate a TableRecordWriter with a BufferedMutator for batch writing.
+     */
+    public TableRecordWriter(JobConf job) throws IOException {
+      // expecting exactly one path
+      TableName tableName = TableName.valueOf(job.get(OUTPUT_TABLE));
+      try {
+        this.conn = ConnectionFactory.createConnection(job);
+        this.m_mutator = conn.getBufferedMutator(tableName);
+      } finally {
+        if (this.m_mutator == null) {
+          conn.close();
+          conn = null;
+        }
+      }
+    }
+
+    public void close(Reporter reporter) throws IOException {
+      try {
+        if (this.m_mutator != null) {
+          this.m_mutator.close();
+        }
+      } finally {
+        if (conn != null) {
+          this.conn.close();
+        }
+      }
+    }
+
+    public void write(ImmutableBytesWritable key, Put value) throws IOException {
+      m_mutator.mutate(new Put(value));
+    }
+  }
+
+  /**
+   * Creates a new record writer.
+   *
+   * Be aware that the baseline javadoc gives the impression that there is a single
+   * {@link RecordWriter} per job but in HBase, it is more natural if we give you a new
+   * RecordWriter per call of this method. You must close the returned RecordWriter when done.
+   * Failure to do so will drop writes.
+   *
+   * @param ignored Ignored filesystem
+   * @param job Current JobConf
+   * @param name Name of the job
+   * @param progress
+   * @return The newly created writer instance.
+   * @throws IOException When creating the writer fails.
+   */
+  @Override
+  public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name,
+      Progressable progress)
+  throws IOException {
+    // Clear write buffer on fail is true by default so no need to reset it.
+    return new TableRecordWriter(job);
+  }
+
+  @Override
+  public void checkOutputSpecs(FileSystem ignored, JobConf job)
+  throws FileAlreadyExistsException, InvalidJobConfException, IOException {
+    String tableName = job.get(OUTPUT_TABLE);
+    if (tableName == null) {
+      throw new IOException("Must specify table name");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java
new file mode 100644
index 0000000..cecef7d
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java
@@ -0,0 +1,139 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapred.RecordReader;
+
+
+/**
+ * Iterate over an HBase table data, return (Text, RowResult) pairs
+ */
+@InterfaceAudience.Public
+public class TableRecordReader
+implements RecordReader<ImmutableBytesWritable, Result> {
+
+  private TableRecordReaderImpl recordReaderImpl = new TableRecordReaderImpl();
+
+  /**
+   * Restart from survivable exceptions by creating a new scanner.
+   *
+   * @param firstRow
+   * @throws IOException
+   */
+  public void restart(byte[] firstRow) throws IOException {
+    this.recordReaderImpl.restart(firstRow);
+  }
+
+  /**
+   * Build the scanner. Not done in constructor to allow for extension.
+   *
+   * @throws IOException
+   */
+  public void init() throws IOException {
+    this.recordReaderImpl.restart(this.recordReaderImpl.getStartRow());
+  }
+
+  /**
+   * @param htable the {@link org.apache.hadoop.hbase.HTableDescriptor} to scan.
+   */
+  public void setHTable(Table htable) {
+    this.recordReaderImpl.setHTable(htable);
+  }
+
+  /**
+   * @param inputColumns the columns to be placed in {@link Result}.
+   */
+  public void setInputColumns(final byte [][] inputColumns) {
+    this.recordReaderImpl.setInputColumns(inputColumns);
+  }
+
+  /**
+   * @param startRow the first row in the split
+   */
+  public void setStartRow(final byte [] startRow) {
+    this.recordReaderImpl.setStartRow(startRow);
+  }
+
+  /**
+   *
+   * @param endRow the last row in the split
+   */
+  public void setEndRow(final byte [] endRow) {
+    this.recordReaderImpl.setEndRow(endRow);
+  }
+
+  /**
+   * @param rowFilter the {@link Filter} to be used.
+   */
+  public void setRowFilter(Filter rowFilter) {
+    this.recordReaderImpl.setRowFilter(rowFilter);
+  }
+
+  public void close() {
+    this.recordReaderImpl.close();
+  }
+
+  /**
+   * @return ImmutableBytesWritable
+   *
+   * @see org.apache.hadoop.mapred.RecordReader#createKey()
+   */
+  public ImmutableBytesWritable createKey() {
+    return this.recordReaderImpl.createKey();
+  }
+
+  /**
+   * @return RowResult
+   *
+   * @see org.apache.hadoop.mapred.RecordReader#createValue()
+   */
+  public Result createValue() {
+    return this.recordReaderImpl.createValue();
+  }
+
+  public long getPos() {
+
+    // This should be the ordinal tuple in the range;
+    // not clear how to calculate...
+    return this.recordReaderImpl.getPos();
+  }
+
+  public float getProgress() {
+    // Depends on the total number of tuples and getPos
+    return this.recordReaderImpl.getPos();
+  }
+
+  /**
+   * @param key HStoreKey as input key.
+   * @param value MapWritable as input value
+   * @return true if there was more data
+   * @throws IOException
+   */
+  public boolean next(ImmutableBytesWritable key, Result value)
+  throws IOException {
+    return this.recordReaderImpl.next(key, value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java
new file mode 100644
index 0000000..f6b79c3
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java
@@ -0,0 +1,259 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.ScannerCallable;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.StringUtils;
+
+import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT;
+
+/**
+ * Iterate over an HBase table data, return (Text, RowResult) pairs
+ */
+@InterfaceAudience.Public
+public class TableRecordReaderImpl {
+  private static final Log LOG = LogFactory.getLog(TableRecordReaderImpl.class);
+
+  private byte [] startRow;
+  private byte [] endRow;
+  private byte [] lastSuccessfulRow;
+  private Filter trrRowFilter;
+  private ResultScanner scanner;
+  private Table htable;
+  private byte [][] trrInputColumns;
+  private long timestamp;
+  private int rowcount;
+  private boolean logScannerActivity = false;
+  private int logPerRowCount = 100;
+
+  /**
+   * Restart from survivable exceptions by creating a new scanner.
+   *
+   * @param firstRow
+   * @throws IOException
+   */
+  public void restart(byte[] firstRow) throws IOException {
+    Scan currentScan;
+    if ((endRow != null) && (endRow.length > 0)) {
+      if (trrRowFilter != null) {
+        Scan scan = new Scan(firstRow, endRow);
+        TableInputFormat.addColumns(scan, trrInputColumns);
+        scan.setFilter(trrRowFilter);
+        scan.setCacheBlocks(false);
+        this.scanner = this.htable.getScanner(scan);
+        currentScan = scan;
+      } else {
+        LOG.debug("TIFB.restart, firstRow: " +
+            Bytes.toStringBinary(firstRow) + ", endRow: " +
+            Bytes.toStringBinary(endRow));
+        Scan scan = new Scan(firstRow, endRow);
+        TableInputFormat.addColumns(scan, trrInputColumns);
+        this.scanner = this.htable.getScanner(scan);
+        currentScan = scan;
+      }
+    } else {
+      LOG.debug("TIFB.restart, firstRow: " +
+          Bytes.toStringBinary(firstRow) + ", no endRow");
+
+      Scan scan = new Scan(firstRow);
+      TableInputFormat.addColumns(scan, trrInputColumns);
+      scan.setFilter(trrRowFilter);
+      this.scanner = this.htable.getScanner(scan);
+      currentScan = scan;
+    }
+    if (logScannerActivity) {
+      LOG.info("Current scan=" + currentScan.toString());
+      timestamp = System.currentTimeMillis();
+      rowcount = 0;
+    }
+  }
+
+  /**
+   * Build the scanner. Not done in constructor to allow for extension.
+   *
+   * @throws IOException
+   */
+  public void init() throws IOException {
+    restart(startRow);
+  }
+
+  byte[] getStartRow() {
+    return this.startRow;
+  }
+  /**
+   * @param htable the {@link org.apache.hadoop.hbase.HTableDescriptor} to scan.
+   */
+  public void setHTable(Table htable) {
+    Configuration conf = htable.getConfiguration();
+    logScannerActivity = conf.getBoolean(
+      ScannerCallable.LOG_SCANNER_ACTIVITY, false);
+    logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
+    this.htable = htable;
+  }
+
+  /**
+   * @param inputColumns the columns to be placed in {@link Result}.
+   */
+  public void setInputColumns(final byte [][] inputColumns) {
+    this.trrInputColumns = inputColumns;
+  }
+
+  /**
+   * @param startRow the first row in the split
+   */
+  public void setStartRow(final byte [] startRow) {
+    this.startRow = startRow;
+  }
+
+  /**
+   *
+   * @param endRow the last row in the split
+   */
+  public void setEndRow(final byte [] endRow) {
+    this.endRow = endRow;
+  }
+
+  /**
+   * @param rowFilter the {@link Filter} to be used.
+   */
+  public void setRowFilter(Filter rowFilter) {
+    this.trrRowFilter = rowFilter;
+  }
+
+  public void close() {
+    if (this.scanner != null) {
+      this.scanner.close();
+    }
+    try {
+      this.htable.close();
+    } catch (IOException ioe) {
+      LOG.warn("Error closing table", ioe);
+    }
+  }
+
+  /**
+   * @return ImmutableBytesWritable
+   *
+   * @see org.apache.hadoop.mapred.RecordReader#createKey()
+   */
+  public ImmutableBytesWritable createKey() {
+    return new ImmutableBytesWritable();
+  }
+
+  /**
+   * @return RowResult
+   *
+   * @see org.apache.hadoop.mapred.RecordReader#createValue()
+   */
+  public Result createValue() {
+    return new Result();
+  }
+
+  public long getPos() {
+    // This should be the ordinal tuple in the range;
+    // not clear how to calculate...
+    return 0;
+  }
+
+  public float getProgress() {
+    // Depends on the total number of tuples and getPos
+    return 0;
+  }
+
+  /**
+   * @param key HStoreKey as input key.
+   * @param value MapWritable as input value
+   * @return true if there was more data
+   * @throws IOException
+   */
+  public boolean next(ImmutableBytesWritable key, Result value)
+  throws IOException {
+    Result result;
+    try {
+      try {
+        result = this.scanner.next();
+        if (logScannerActivity) {
+          rowcount ++;
+          if (rowcount >= logPerRowCount) {
+            long now = System.currentTimeMillis();
+            LOG.info("Mapper took " + (now-timestamp)
+              + "ms to process " + rowcount + " rows");
+            timestamp = now;
+            rowcount = 0;
+          }
+        }
+      } catch (IOException e) {
+        // do not retry if the exception tells us not to do so
+        if (e instanceof DoNotRetryIOException) {
+          throw e;
+        }
+        // try to handle all other IOExceptions by restarting
+        // the scanner, if the second call fails, it will be rethrown
+        LOG.debug("recovered from " + StringUtils.stringifyException(e));
+        if (lastSuccessfulRow == null) {
+          LOG.warn("We are restarting the first next() invocation," +
+              " if your mapper has restarted a few other times like this" +
+              " then you should consider killing this job and investigate" +
+              " why it's taking so long.");
+        }
+        if (lastSuccessfulRow == null) {
+          restart(startRow);
+        } else {
+          restart(lastSuccessfulRow);
+          this.scanner.next();    // skip presumed already mapped row
+        }
+        result = this.scanner.next();
+      }
+
+      if (result != null && result.size() > 0) {
+        key.set(result.getRow());
+        lastSuccessfulRow = key.get();
+        value.copyFrom(result);
+        return true;
+      }
+      return false;
+    } catch (IOException ioe) {
+      if (logScannerActivity) {
+        long now = System.currentTimeMillis();
+        LOG.info("Mapper took " + (now-timestamp)
+          + "ms to process " + rowcount + " rows");
+        LOG.info(ioe);
+        String lastRow = lastSuccessfulRow == null ?
+          "null" : Bytes.toStringBinary(lastSuccessfulRow);
+        LOG.info("lastSuccessfulRow=" + lastRow);
+      }
+      throw ioe;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableReduce.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableReduce.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableReduce.java
new file mode 100644
index 0000000..91fb4a1
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableReduce.java
@@ -0,0 +1,38 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.Reducer;
+
+/**
+ * Write a table, sorting by the input key
+ *
+ * @param <K> key class
+ * @param <V> value class
+ */
+@InterfaceAudience.Public
+@SuppressWarnings("unchecked")
+public interface TableReduce<K extends WritableComparable, V>
+extends Reducer<K, V, ImmutableBytesWritable, Put> {
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java
new file mode 100644
index 0000000..d7b49ff
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapred;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * TableSnapshotInputFormat allows a MapReduce job to run over a table snapshot. Further
+ * documentation available on {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat}.
+ *
+ * @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat
+ */
+@InterfaceAudience.Public
+public class TableSnapshotInputFormat implements InputFormat<ImmutableBytesWritable, Result> {
+
+  public static class TableSnapshotRegionSplit implements InputSplit {
+    private TableSnapshotInputFormatImpl.InputSplit delegate;
+
+    // constructor for mapreduce framework / Writable
+    public TableSnapshotRegionSplit() {
+      this.delegate = new TableSnapshotInputFormatImpl.InputSplit();
+    }
+
+    public TableSnapshotRegionSplit(TableSnapshotInputFormatImpl.InputSplit delegate) {
+      this.delegate = delegate;
+    }
+
+    public TableSnapshotRegionSplit(HTableDescriptor htd, HRegionInfo regionInfo,
+        List<String> locations, Scan scan, Path restoreDir) {
+      this.delegate =
+          new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations, scan, restoreDir);
+    }
+
+    @Override
+    public long getLength() throws IOException {
+      return delegate.getLength();
+    }
+
+    @Override
+    public String[] getLocations() throws IOException {
+      return delegate.getLocations();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      delegate.write(out);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      delegate.readFields(in);
+    }
+  }
+
+  static class TableSnapshotRecordReader
+    implements RecordReader<ImmutableBytesWritable, Result> {
+
+    private TableSnapshotInputFormatImpl.RecordReader delegate;
+
+    public TableSnapshotRecordReader(TableSnapshotRegionSplit split, JobConf job)
+        throws IOException {
+      delegate = new TableSnapshotInputFormatImpl.RecordReader();
+      delegate.initialize(split.delegate, job);
+    }
+
+    @Override
+    public boolean next(ImmutableBytesWritable key, Result value) throws IOException {
+      if (!delegate.nextKeyValue()) {
+        return false;
+      }
+      ImmutableBytesWritable currentKey = delegate.getCurrentKey();
+      key.set(currentKey.get(), currentKey.getOffset(), currentKey.getLength());
+      value.copyFrom(delegate.getCurrentValue());
+      return true;
+    }
+
+    @Override
+    public ImmutableBytesWritable createKey() {
+      return new ImmutableBytesWritable();
+    }
+
+    @Override
+    public Result createValue() {
+      return new Result();
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      return delegate.getPos();
+    }
+
+    @Override
+    public void close() throws IOException {
+      delegate.close();
+    }
+
+    @Override
+    public float getProgress() throws IOException {
+      return delegate.getProgress();
+    }
+  }
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    List<TableSnapshotInputFormatImpl.InputSplit> splits =
+      TableSnapshotInputFormatImpl.getSplits(job);
+    InputSplit[] results = new InputSplit[splits.size()];
+    for (int i = 0; i < splits.size(); i++) {
+      results[i] = new TableSnapshotRegionSplit(splits.get(i));
+    }
+    return results;
+  }
+
+  @Override
+  public RecordReader<ImmutableBytesWritable, Result>
+  getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
+    return new TableSnapshotRecordReader((TableSnapshotRegionSplit) split, job);
+  }
+
+  /**
+   * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
+   * @param job the job to configure
+   * @param snapshotName the name of the snapshot to read from
+   * @param restoreDir a temporary directory to restore the snapshot into. Current user should
+   * have write permissions to this directory, and this should not be a subdirectory of rootdir.
+   * After the job is finished, restoreDir can be deleted.
+   * @throws IOException if an error occurs
+   */
+  public static void setInput(JobConf job, String snapshotName, Path restoreDir)
+      throws IOException {
+    TableSnapshotInputFormatImpl.setInput(job, snapshotName, restoreDir);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableSplit.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableSplit.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableSplit.java
new file mode 100644
index 0000000..0784e5e
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableSplit.java
@@ -0,0 +1,154 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.InputSplit;
+
+/**
+ * A table split corresponds to a key range [low, high)
+ */
+@InterfaceAudience.Public
+public class TableSplit implements InputSplit, Comparable<TableSplit> {
+  private TableName m_tableName;
+  private byte [] m_startRow;
+  private byte [] m_endRow;
+  private String m_regionLocation;
+
+  /** default constructor */
+  public TableSplit() {
+    this((TableName)null, HConstants.EMPTY_BYTE_ARRAY,
+      HConstants.EMPTY_BYTE_ARRAY, "");
+  }
+
+  /**
+   * Constructor
+   * @param tableName
+   * @param startRow
+   * @param endRow
+   * @param location
+   */
+  public TableSplit(TableName tableName, byte [] startRow, byte [] endRow,
+      final String location) {
+    this.m_tableName = tableName;
+    this.m_startRow = startRow;
+    this.m_endRow = endRow;
+    this.m_regionLocation = location;
+  }
+
+  public TableSplit(byte [] tableName, byte [] startRow, byte [] endRow,
+      final String location) {
+    this(TableName.valueOf(tableName), startRow, endRow,
+      location);
+  }
+
+  /** @return table name */
+  public TableName getTable() {
+    return this.m_tableName;
+  }
+
+  /** @return table name */
+   public byte [] getTableName() {
+     return this.m_tableName.getName();
+   }
+
+  /** @return starting row key */
+  public byte [] getStartRow() {
+    return this.m_startRow;
+  }
+
+  /** @return end row key */
+  public byte [] getEndRow() {
+    return this.m_endRow;
+  }
+
+  /** @return the region's hostname */
+  public String getRegionLocation() {
+    return this.m_regionLocation;
+  }
+
+  public String[] getLocations() {
+    return new String[] {this.m_regionLocation};
+  }
+
+  public long getLength() {
+    // Not clear how to obtain this... seems to be used only for sorting splits
+    return 0;
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    this.m_tableName = TableName.valueOf(Bytes.readByteArray(in));
+    this.m_startRow = Bytes.readByteArray(in);
+    this.m_endRow = Bytes.readByteArray(in);
+    this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in));
+  }
+
+  public void write(DataOutput out) throws IOException {
+    Bytes.writeByteArray(out, this.m_tableName.getName());
+    Bytes.writeByteArray(out, this.m_startRow);
+    Bytes.writeByteArray(out, this.m_endRow);
+    Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation));
+  }
+
+  @Override
+  public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("HBase table split(");
+      sb.append("table name: ").append(m_tableName);
+      sb.append(", start row: ").append(Bytes.toStringBinary(m_startRow));
+      sb.append(", end row: ").append(Bytes.toStringBinary(m_endRow));
+      sb.append(", region location: ").append(m_regionLocation);
+      sb.append(")");
+      return sb.toString();
+  }
+
+  @Override
+  public int compareTo(TableSplit o) {
+    return Bytes.compareTo(getStartRow(), o.getStartRow());
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o == null || !(o instanceof TableSplit)) {
+      return false;
+    }
+    TableSplit other = (TableSplit)o;
+    return m_tableName.equals(other.m_tableName) &&
+      Bytes.equals(m_startRow, other.m_startRow) &&
+      Bytes.equals(m_endRow, other.m_endRow) &&
+      m_regionLocation.equals(other.m_regionLocation);
+  }
+
+  @Override
+  public int hashCode() {
+    int result = m_tableName != null ? m_tableName.hashCode() : 0;
+    result = 31 * result + Arrays.hashCode(m_startRow);
+    result = 31 * result + Arrays.hashCode(m_endRow);
+    result = 31 * result + (m_regionLocation != null ? m_regionLocation.hashCode() : 0);
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/package-info.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/package-info.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/package-info.java
new file mode 100644
index 0000000..1da3a52
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/package-info.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+Provides HBase <a href="http://wiki.apache.org/hadoop/HadoopMapReduce">MapReduce</a>
+Input/OutputFormats, a table indexing MapReduce job, and utility methods.
+
+<p>See <a href="http://hbase.apache.org/book.html#mapreduce">HBase and MapReduce</a>
+in the HBase Reference Guide for mapreduce over hbase documentation.
+*/
+package org.apache.hadoop.hbase.mapred;

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java
new file mode 100644
index 0000000..078033e
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java
@@ -0,0 +1,333 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.PrefixFilter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
+
+
+/**
+ * A job with a a map and reduce phase to count cells in a table.
+ * The counter lists the following stats for a given table:
+ * <pre>
+ * 1. Total number of rows in the table
+ * 2. Total number of CFs across all rows
+ * 3. Total qualifiers across all rows
+ * 4. Total occurrence of each CF
+ * 5. Total occurrence  of each qualifier
+ * 6. Total number of versions of each qualifier.
+ * </pre>
+ *
+ * The cellcounter can take optional parameters to use a user
+ * supplied row/family/qualifier string to use in the report and
+ * second a regex based or prefix based row filter to restrict the
+ * count operation to a limited subset of rows from the table or a
+ * start time and/or end time to limit the count to a time range.
+ */
+@InterfaceAudience.Public
+public class CellCounter extends Configured implements Tool {
+  private static final Log LOG =
+    LogFactory.getLog(CellCounter.class.getName());
+
+
+  /**
+   * Name of this 'program'.
+   */
+  static final String NAME = "CellCounter";
+
+  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
+
+  /**
+   * Mapper that runs the count.
+   */
+  static class CellCounterMapper
+  extends TableMapper<Text, IntWritable> {
+    /**
+     * Counter enumeration to count the actual rows.
+     */
+    public static enum Counters {
+      ROWS,
+      CELLS
+    }
+
+    private Configuration conf;
+    private String separator;
+
+    // state of current row, family, column needs to persist across map() invocations
+    // in order to properly handle scanner batching, where a single qualifier may have too
+    // many versions for a single map() call
+    private byte[] lastRow;
+    private String currentRowKey;
+    byte[] currentFamily = null;
+    String currentFamilyName = null;
+    byte[] currentQualifier = null;
+    // family + qualifier
+    String currentQualifierName = null;
+    // rowkey + family + qualifier
+    String currentRowQualifierName = null;
+
+    @Override
+    protected void setup(Context context) throws IOException, InterruptedException {
+      conf = context.getConfiguration();
+      separator = conf.get("ReportSeparator",":");
+    }
+
+    /**
+     * Maps the data.
+     *
+     * @param row     The current table row key.
+     * @param values  The columns.
+     * @param context The current context.
+     * @throws IOException When something is broken with the data.
+     * @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN,
+     *      org.apache.hadoop.mapreduce.Mapper.Context)
+     */
+
+    @Override
+    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
+      justification="Findbugs is blind to the Precondition null check")
+    public void map(ImmutableBytesWritable row, Result values,
+                    Context context)
+        throws IOException {
+      Preconditions.checkState(values != null,
+          "values passed to the map is null");
+
+      try {
+        byte[] currentRow = values.getRow();
+        if (lastRow == null || !Bytes.equals(lastRow, currentRow)) {
+          lastRow = currentRow;
+          currentRowKey = Bytes.toStringBinary(currentRow);
+          currentFamily = null;
+          currentQualifier = null;
+          context.getCounter(Counters.ROWS).increment(1);
+          context.write(new Text("Total ROWS"), new IntWritable(1));
+        }
+        if (!values.isEmpty()) {
+          int cellCount = 0;
+          for (Cell value : values.listCells()) {
+            cellCount++;
+            if (currentFamily == null || !CellUtil.matchingFamily(value, currentFamily)) {
+              currentFamily = CellUtil.cloneFamily(value);
+              currentFamilyName = Bytes.toStringBinary(currentFamily);
+              currentQualifier = null;
+              context.getCounter("CF", currentFamilyName).increment(1);
+              if (1 == context.getCounter("CF", currentFamilyName).getValue()) {
+                context.write(new Text("Total Families Across all Rows"), new IntWritable(1));
+                context.write(new Text(currentFamily), new IntWritable(1));
+              }
+            }
+            if (currentQualifier == null || !CellUtil.matchingQualifier(value, currentQualifier)) {
+              currentQualifier = CellUtil.cloneQualifier(value);
+              currentQualifierName = currentFamilyName + separator +
+                  Bytes.toStringBinary(currentQualifier);
+              currentRowQualifierName = currentRowKey + separator + currentQualifierName;
+
+              context.write(new Text("Total Qualifiers across all Rows"),
+                  new IntWritable(1));
+              context.write(new Text(currentQualifierName), new IntWritable(1));
+            }
+            // Increment versions
+            context.write(new Text(currentRowQualifierName + "_Versions"), new IntWritable(1));
+          }
+          context.getCounter(Counters.CELLS).increment(cellCount);
+        }
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  static class IntSumReducer<Key> extends Reducer<Key, IntWritable,
+      Key, IntWritable> {
+
+    private IntWritable result = new IntWritable();
+    public void reduce(Key key, Iterable<IntWritable> values,
+      Context context)
+    throws IOException, InterruptedException {
+      int sum = 0;
+      for (IntWritable val : values) {
+        sum += val.get();
+      }
+      result.set(sum);
+      context.write(key, result);
+    }
+  }
+
+  /**
+   * Sets up the actual job.
+   *
+   * @param conf The current configuration.
+   * @param args The command line parameters.
+   * @return The newly created job.
+   * @throws IOException When setting up the job fails.
+   */
+  public static Job createSubmittableJob(Configuration conf, String[] args)
+      throws IOException {
+    String tableName = args[0];
+    Path outputDir = new Path(args[1]);
+    String reportSeparatorString = (args.length > 2) ? args[2]: ":";
+    conf.set("ReportSeparator", reportSeparatorString);
+    Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
+    job.setJarByClass(CellCounter.class);
+    Scan scan = getConfiguredScanForJob(conf, args);
+    TableMapReduceUtil.initTableMapperJob(tableName, scan,
+        CellCounterMapper.class, ImmutableBytesWritable.class, Result.class, job);
+    job.setNumReduceTasks(1);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(IntWritable.class);
+    job.setOutputFormatClass(TextOutputFormat.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(IntWritable.class);
+    FileOutputFormat.setOutputPath(job, outputDir);
+    job.setReducerClass(IntSumReducer.class);
+    return job;
+  }
+
+  private static Scan getConfiguredScanForJob(Configuration conf, String[] args)
+      throws IOException {
+    // create scan with any properties set from TableInputFormat
+    Scan s = TableInputFormat.createScanFromConfiguration(conf);
+    // Set Scan Versions
+    if (conf.get(TableInputFormat.SCAN_MAXVERSIONS) == null) {
+      // default to all versions unless explicitly set
+      s.setMaxVersions(Integer.MAX_VALUE);
+    }
+    s.setCacheBlocks(false);
+    // Set RowFilter or Prefix Filter if applicable.
+    Filter rowFilter = getRowFilter(args);
+    if (rowFilter!= null) {
+      LOG.info("Setting Row Filter for counter.");
+      s.setFilter(rowFilter);
+    }
+    // Set TimeRange if defined
+    long timeRange[] = getTimeRange(args);
+    if (timeRange != null) {
+      LOG.info("Setting TimeRange for counter.");
+      s.setTimeRange(timeRange[0], timeRange[1]);
+    }
+    return s;
+  }
+
+
+  private static Filter getRowFilter(String[] args) {
+    Filter rowFilter = null;
+    String filterCriteria = (args.length > 3) ? args[3]: null;
+    if (filterCriteria == null) return null;
+    if (filterCriteria.startsWith("^")) {
+      String regexPattern = filterCriteria.substring(1, filterCriteria.length());
+      rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regexPattern));
+    } else {
+      rowFilter = new PrefixFilter(Bytes.toBytesBinary(filterCriteria));
+    }
+    return rowFilter;
+  }
+
+  private static long[] getTimeRange(String[] args) throws IOException {
+    final String startTimeArgKey = "--starttime=";
+    final String endTimeArgKey = "--endtime=";
+    long startTime = 0L;
+    long endTime = 0L;
+
+    for (int i = 1; i < args.length; i++) {
+      System.out.println("i:" + i + "arg[i]" + args[i]);
+      if (args[i].startsWith(startTimeArgKey)) {
+        startTime = Long.parseLong(args[i].substring(startTimeArgKey.length()));
+      }
+      if (args[i].startsWith(endTimeArgKey)) {
+        endTime = Long.parseLong(args[i].substring(endTimeArgKey.length()));
+      }
+    }
+
+    if (startTime == 0 && endTime == 0)
+      return null;
+
+    endTime = endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime;
+    return new long [] {startTime, endTime};
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    if (args.length < 2) {
+      System.err.println("ERROR: Wrong number of parameters: " + args.length);
+      System.err.println("Usage: CellCounter ");
+      System.err.println("       <tablename> <outputDir> <reportSeparator> [^[regex pattern] or " +
+        "[Prefix] for row filter]] --starttime=[starttime] --endtime=[endtime]");
+      System.err.println("  Note: -D properties will be applied to the conf used. ");
+      System.err.println("  Additionally, all of the SCAN properties from TableInputFormat");
+      System.err.println("  can be specified to get fine grained control on what is counted..");
+      System.err.println("   -D " + TableInputFormat.SCAN_ROW_START + "=<rowkey>");
+      System.err.println("   -D " + TableInputFormat.SCAN_ROW_STOP + "=<rowkey>");
+      System.err.println("   -D " + TableInputFormat.SCAN_COLUMNS + "=\"<col1> <col2>...\"");
+      System.err.println("   -D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=<family1>,<family2>, ...");
+      System.err.println("   -D " + TableInputFormat.SCAN_TIMESTAMP + "=<timestamp>");
+      System.err.println("   -D " + TableInputFormat.SCAN_TIMERANGE_START + "=<timestamp>");
+      System.err.println("   -D " + TableInputFormat.SCAN_TIMERANGE_END + "=<timestamp>");
+      System.err.println("   -D " + TableInputFormat.SCAN_MAXVERSIONS + "=<count>");
+      System.err.println("   -D " + TableInputFormat.SCAN_CACHEDROWS + "=<count>");
+      System.err.println("   -D " + TableInputFormat.SCAN_BATCHSIZE + "=<count>");
+      System.err.println(" <reportSeparator> parameter can be used to override the default report separator " +
+          "string : used to separate the rowId/column family name and qualifier name.");
+      System.err.println(" [^[regex pattern] or [Prefix] parameter can be used to limit the cell counter count " +
+          "operation to a limited subset of rows from the table based on regex or prefix pattern.");
+      return -1;
+    }
+    Job job = createSubmittableJob(getConf(), args);
+    return (job.waitForCompletion(true) ? 0 : 1);
+  }
+
+  /**
+   * Main entry point.
+   * @param args The command line parameters.
+   * @throws Exception When running the job fails.
+   */
+  public static void main(String[] args) throws Exception {
+    int errCode = ToolRunner.run(HBaseConfiguration.create(), new CellCounter(), args);
+    System.exit(errCode);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java
new file mode 100644
index 0000000..1d4d37b
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * Facade to create Cells for HFileOutputFormat. The created Cells are of <code>Put</code> type.
+ */
+@InterfaceAudience.Public
+public class CellCreator {
+
+  public static final String VISIBILITY_EXP_RESOLVER_CLASS =
+      "hbase.mapreduce.visibility.expression.resolver.class";
+
+  private VisibilityExpressionResolver visExpResolver;
+
+  public CellCreator(Configuration conf) {
+    Class<? extends VisibilityExpressionResolver> clazz = conf.getClass(
+        VISIBILITY_EXP_RESOLVER_CLASS, DefaultVisibilityExpressionResolver.class,
+        VisibilityExpressionResolver.class);
+    this.visExpResolver = ReflectionUtils.newInstance(clazz, conf);
+    this.visExpResolver.init();
+  }
+
+  /**
+   * @param row row key
+   * @param roffset row offset
+   * @param rlength row length
+   * @param family family name
+   * @param foffset family offset
+   * @param flength family length
+   * @param qualifier column qualifier
+   * @param qoffset qualifier offset
+   * @param qlength qualifier length
+   * @param timestamp version timestamp
+   * @param value column value
+   * @param voffset value offset
+   * @param vlength value length
+   * @return created Cell
+   * @throws IOException
+   */
+  public Cell create(byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength,
+      byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset,
+      int vlength) throws IOException {
+    return create(row, roffset, rlength, family, foffset, flength, qualifier, qoffset, qlength,
+        timestamp, value, voffset, vlength, (List<Tag>)null);
+  }
+
+  /**
+   * @param row row key
+   * @param roffset row offset
+   * @param rlength row length
+   * @param family family name
+   * @param foffset family offset
+   * @param flength family length
+   * @param qualifier column qualifier
+   * @param qoffset qualifier offset
+   * @param qlength qualifier length
+   * @param timestamp version timestamp
+   * @param value column value
+   * @param voffset value offset
+   * @param vlength value length
+   * @param visExpression visibility expression to be associated with cell
+   * @return created Cell
+   * @throws IOException
+   */
+  @Deprecated
+  public Cell create(byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength,
+      byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset,
+      int vlength, String visExpression) throws IOException {
+    List<Tag> visTags = null;
+    if (visExpression != null) {
+      visTags = this.visExpResolver.createVisibilityExpTags(visExpression);
+    }
+    return new KeyValue(row, roffset, rlength, family, foffset, flength, qualifier, qoffset,
+        qlength, timestamp, KeyValue.Type.Put, value, voffset, vlength, visTags);
+  }
+
+  /**
+   * @param row row key
+   * @param roffset row offset
+   * @param rlength row length
+   * @param family family name
+   * @param foffset family offset
+   * @param flength family length
+   * @param qualifier column qualifier
+   * @param qoffset qualifier offset
+   * @param qlength qualifier length
+   * @param timestamp version timestamp
+   * @param value column value
+   * @param voffset value offset
+   * @param vlength value length
+   * @param tags
+   * @return created Cell
+   * @throws IOException
+   */
+  public Cell create(byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength,
+      byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset,
+      int vlength, List<Tag> tags) throws IOException {
+    return new KeyValue(row, roffset, rlength, family, foffset, flength, qualifier, qoffset,
+        qlength, timestamp, KeyValue.Type.Put, value, voffset, vlength, tags);
+  }
+
+  /**
+   * @return Visibility expression resolver
+   */
+  public VisibilityExpressionResolver getVisibilityExpressionResolver() {
+    return this.visExpResolver;
+  }
+}