You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/08/27 05:33:46 UTC
[46/50] [abbrv] hbase git commit: HBASE-18640 Move mapreduce out of
hbase-server into separate module.
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
new file mode 100644
index 0000000..9811a97
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
@@ -0,0 +1,313 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * A Base for {@link TableInputFormat}s. Receives a {@link Table}, a
+ * byte[] of input columns and optionally a {@link Filter}.
+ * Subclasses may use other TableRecordReader implementations.
+ *
+ * Subclasses MUST ensure initializeTable(Connection, TableName) is called for an instance to
+ * function properly. Each of the entry points to this class used by the MapReduce framework,
+ * {@link #getRecordReader(InputSplit, JobConf, Reporter)} and {@link #getSplits(JobConf, int)},
+ * will call {@link #initialize(JobConf)} as a convenient centralized location to handle
+ * retrieving the necessary configuration information. If your subclass overrides either of these
+ * methods, either call the parent version or call initialize yourself.
+ *
+ * <p>
+ * An example of a subclass:
+ * <pre>
+ * class ExampleTIF extends TableInputFormatBase {
+ *
+ * {@literal @}Override
+ * protected void initialize(JobConf context) throws IOException {
+ * // We are responsible for the lifecycle of this connection until we hand it over in
+ * // initializeTable.
+ * Connection connection =
+ * ConnectionFactory.createConnection(HBaseConfiguration.create(job));
+ * TableName tableName = TableName.valueOf("exampleTable");
+ * // mandatory. once passed here, TableInputFormatBase will handle closing the connection.
+ * initializeTable(connection, tableName);
+ * byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
+ * Bytes.toBytes("columnB") };
+ * // mandatory
+ * setInputColumns(inputColumns);
+ * // optional, by default we'll get everything for the given columns.
+ * Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
+ * setRowFilter(exampleFilter);
+ * }
+ * }
+ * </pre>
+ */
+
+@InterfaceAudience.Public
+public abstract class TableInputFormatBase
+implements InputFormat<ImmutableBytesWritable, Result> {
+ private static final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
+ private byte [][] inputColumns;
+ private Table table;
+ private RegionLocator regionLocator;
+ private Connection connection;
+ private TableRecordReader tableRecordReader;
+ private Filter rowFilter;
+
+ private static final String NOT_INITIALIZED = "The input format instance has not been properly " +
+ "initialized. Ensure you call initializeTable either in your constructor or initialize " +
+ "method";
+ private static final String INITIALIZATION_ERROR = "Cannot create a record reader because of a" +
+ " previous error. Please look at the previous logs lines from" +
+ " the task's full log for more details.";
+
+ /**
+ * Builds a TableRecordReader. If no TableRecordReader was provided, uses
+ * the default.
+ *
+ * @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit,
+ * JobConf, Reporter)
+ */
+ public RecordReader<ImmutableBytesWritable, Result> getRecordReader(
+ InputSplit split, JobConf job, Reporter reporter)
+ throws IOException {
+ // In case a subclass uses the deprecated approach or calls initializeTable directly
+ if (table == null) {
+ initialize(job);
+ }
+ // null check in case our child overrides getTable to not throw.
+ try {
+ if (getTable() == null) {
+ // initialize() must not have been implemented in the subclass.
+ throw new IOException(INITIALIZATION_ERROR);
+ }
+ } catch (IllegalStateException exception) {
+ throw new IOException(INITIALIZATION_ERROR, exception);
+ }
+
+ TableSplit tSplit = (TableSplit) split;
+ // if no table record reader was provided use default
+ final TableRecordReader trr = this.tableRecordReader == null ? new TableRecordReader() :
+ this.tableRecordReader;
+ trr.setStartRow(tSplit.getStartRow());
+ trr.setEndRow(tSplit.getEndRow());
+ trr.setHTable(this.table);
+ trr.setInputColumns(this.inputColumns);
+ trr.setRowFilter(this.rowFilter);
+ trr.init();
+ return new RecordReader<ImmutableBytesWritable, Result>() {
+
+ @Override
+ public void close() throws IOException {
+ trr.close();
+ closeTable();
+ }
+
+ @Override
+ public ImmutableBytesWritable createKey() {
+ return trr.createKey();
+ }
+
+ @Override
+ public Result createValue() {
+ return trr.createValue();
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return trr.getPos();
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return trr.getProgress();
+ }
+
+ @Override
+ public boolean next(ImmutableBytesWritable key, Result value) throws IOException {
+ return trr.next(key, value);
+ }
+ };
+ }
+
+ /**
+ * Calculates the splits that will serve as input for the map tasks.
+ *
+ * Splits are created in number equal to the smallest between numSplits and
+ * the number of {@link org.apache.hadoop.hbase.regionserver.HRegion}s in the table.
+ * If the number of splits is smaller than the number of
+ * {@link org.apache.hadoop.hbase.regionserver.HRegion}s then splits are spanned across
+ * multiple {@link org.apache.hadoop.hbase.regionserver.HRegion}s
+ * and are grouped the most evenly possible. In the
+ * case splits are uneven the bigger splits are placed first in the
+ * {@link InputSplit} array.
+ *
+ * @param job the map task {@link JobConf}
+ * @param numSplits a hint to calculate the number of splits (mapred.map.tasks).
+ *
+ * @return the input splits
+ *
+ * @see org.apache.hadoop.mapred.InputFormat#getSplits(org.apache.hadoop.mapred.JobConf, int)
+ */
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ if (this.table == null) {
+ initialize(job);
+ }
+ // null check in case our child overrides getTable to not throw.
+ try {
+ if (getTable() == null) {
+ // initialize() must not have been implemented in the subclass.
+ throw new IOException(INITIALIZATION_ERROR);
+ }
+ } catch (IllegalStateException exception) {
+ throw new IOException(INITIALIZATION_ERROR, exception);
+ }
+
+ byte [][] startKeys = this.regionLocator.getStartKeys();
+ if (startKeys == null || startKeys.length == 0) {
+ throw new IOException("Expecting at least one region");
+ }
+ if (this.inputColumns == null || this.inputColumns.length == 0) {
+ throw new IOException("Expecting at least one column");
+ }
+ int realNumSplits = numSplits > startKeys.length? startKeys.length:
+ numSplits;
+ InputSplit[] splits = new InputSplit[realNumSplits];
+ int middle = startKeys.length / realNumSplits;
+ int startPos = 0;
+ for (int i = 0; i < realNumSplits; i++) {
+ int lastPos = startPos + middle;
+ lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos;
+ String regionLocation = regionLocator.getRegionLocation(startKeys[startPos]).
+ getHostname();
+ splits[i] = new TableSplit(this.table.getName(),
+ startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos]:
+ HConstants.EMPTY_START_ROW, regionLocation);
+ LOG.info("split: " + i + "->" + splits[i]);
+ startPos = lastPos;
+ }
+ return splits;
+ }
+
+ /**
+ * Allows subclasses to initialize the table information.
+ *
+ * @param connection The Connection to the HBase cluster. MUST be unmanaged. We will close.
+ * @param tableName The {@link TableName} of the table to process.
+ * @throws IOException
+ */
+ protected void initializeTable(Connection connection, TableName tableName) throws IOException {
+ if (this.table != null || this.connection != null) {
+ LOG.warn("initializeTable called multiple times. Overwriting connection and table " +
+ "reference; TableInputFormatBase will not close these old references when done.");
+ }
+ this.table = connection.getTable(tableName);
+ this.regionLocator = connection.getRegionLocator(tableName);
+ this.connection = connection;
+ }
+
+ /**
+ * @param inputColumns to be passed in {@link Result} to the map task.
+ */
+ protected void setInputColumns(byte [][] inputColumns) {
+ this.inputColumns = inputColumns;
+ }
+
+ /**
+ * Allows subclasses to get the {@link Table}.
+ */
+ protected Table getTable() {
+ if (table == null) {
+ throw new IllegalStateException(NOT_INITIALIZED);
+ }
+ return this.table;
+ }
+
+ /**
+ * Allows subclasses to set the {@link TableRecordReader}.
+ *
+ * @param tableRecordReader
+ * to provide other {@link TableRecordReader} implementations.
+ */
+ protected void setTableRecordReader(TableRecordReader tableRecordReader) {
+ this.tableRecordReader = tableRecordReader;
+ }
+
+ /**
+ * Allows subclasses to set the {@link Filter} to be used.
+ *
+ * @param rowFilter
+ */
+ protected void setRowFilter(Filter rowFilter) {
+ this.rowFilter = rowFilter;
+ }
+
+ /**
+ * Handle subclass specific set up.
+ * Each of the entry points used by the MapReduce framework,
+ * {@link #getRecordReader(InputSplit, JobConf, Reporter)} and {@link #getSplits(JobConf, int)},
+ * will call {@link #initialize(JobConf)} as a convenient centralized location to handle
+ * retrieving the necessary configuration information and calling
+ * {@link #initializeTable(Connection, TableName)}.
+ *
+ * Subclasses should implement their initialize call such that it is safe to call multiple times.
+ * The current TableInputFormatBase implementation relies on a non-null table reference to decide
+ * if an initialize call is needed, but this behavior may change in the future. In particular,
+ * it is critical that initializeTable not be called multiple times since this will leak
+ * Connection instances.
+ *
+ */
+ protected void initialize(JobConf job) throws IOException {
+ }
+
+ /**
+ * Close the Table and related objects that were initialized via
+ * {@link #initializeTable(Connection, TableName)}.
+ *
+ * @throws IOException
+ */
+ protected void closeTable() throws IOException {
+ close(table, connection);
+ table = null;
+ connection = null;
+ }
+
+ private void close(Closeable... closables) throws IOException {
+ for (Closeable c : closables) {
+ if(c != null) { c.close(); }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableMap.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableMap.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableMap.java
new file mode 100644
index 0000000..a9f1e61
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableMap.java
@@ -0,0 +1,38 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.Mapper;
+
+/**
+ * Scan an HBase table to sort by a specified sort column.
+ * If the column does not exist, the record is not passed to Reduce.
+ *
+ * @param <K> WritableComparable key class
+ * @param <V> Writable value class
+ */
+@InterfaceAudience.Public
+public interface TableMap<K extends WritableComparable<? super K>, V>
+extends Mapper<ImmutableBytesWritable, Result, K, V> {
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
new file mode 100644
index 0000000..63ec418
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
@@ -0,0 +1,376 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
+import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.token.TokenUtil;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Utility for {@link TableMap} and {@link TableReduce}
+ */
+@InterfaceAudience.Public
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class TableMapReduceUtil {
+
+ /**
+ * Use this before submitting a TableMap job. It will
+ * appropriately set up the JobConf.
+ *
+ * @param table The table name to read from.
+ * @param columns The columns to scan.
+ * @param mapper The mapper class to use.
+ * @param outputKeyClass The class of the output key.
+ * @param outputValueClass The class of the output value.
+ * @param job The current job configuration to adjust.
+ */
+ public static void initTableMapJob(String table, String columns,
+ Class<? extends TableMap> mapper,
+ Class<?> outputKeyClass,
+ Class<?> outputValueClass, JobConf job) {
+ initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job,
+ true, TableInputFormat.class);
+ }
+
+ public static void initTableMapJob(String table, String columns,
+ Class<? extends TableMap> mapper,
+ Class<?> outputKeyClass,
+ Class<?> outputValueClass, JobConf job, boolean addDependencyJars) {
+ initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job,
+ addDependencyJars, TableInputFormat.class);
+ }
+
+ /**
+ * Use this before submitting a TableMap job. It will
+ * appropriately set up the JobConf.
+ *
+ * @param table The table name to read from.
+ * @param columns The columns to scan.
+ * @param mapper The mapper class to use.
+ * @param outputKeyClass The class of the output key.
+ * @param outputValueClass The class of the output value.
+ * @param job The current job configuration to adjust.
+ * @param addDependencyJars upload HBase jars and jars for any of the configured
+ * job classes via the distributed cache (tmpjars).
+ */
+ public static void initTableMapJob(String table, String columns,
+ Class<? extends TableMap> mapper,
+ Class<?> outputKeyClass,
+ Class<?> outputValueClass, JobConf job, boolean addDependencyJars,
+ Class<? extends InputFormat> inputFormat) {
+
+ job.setInputFormat(inputFormat);
+ job.setMapOutputValueClass(outputValueClass);
+ job.setMapOutputKeyClass(outputKeyClass);
+ job.setMapperClass(mapper);
+ job.setStrings("io.serializations", job.get("io.serializations"),
+ MutationSerialization.class.getName(), ResultSerialization.class.getName());
+ FileInputFormat.addInputPaths(job, table);
+ job.set(TableInputFormat.COLUMN_LIST, columns);
+ if (addDependencyJars) {
+ try {
+ addDependencyJars(job);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ try {
+ initCredentials(job);
+ } catch (IOException ioe) {
+ // just spit out the stack trace? really?
+ ioe.printStackTrace();
+ }
+ }
+
+ /**
+ * Sets up the job for reading from one or more multiple table snapshots, with one or more scans
+ * per snapshot.
+ * It bypasses hbase servers and read directly from snapshot files.
+ *
+ * @param snapshotScans map of snapshot name to scans on that snapshot.
+ * @param mapper The mapper class to use.
+ * @param outputKeyClass The class of the output key.
+ * @param outputValueClass The class of the output value.
+ * @param job The current job to adjust. Make sure the passed job is
+ * carrying all necessary HBase configuration.
+ * @param addDependencyJars upload HBase jars and jars for any of the configured
+ * job classes via the distributed cache (tmpjars).
+ */
+ public static void initMultiTableSnapshotMapperJob(Map<String, Collection<Scan>> snapshotScans,
+ Class<? extends TableMap> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
+ JobConf job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException {
+ MultiTableSnapshotInputFormat.setInput(job, snapshotScans, tmpRestoreDir);
+
+ job.setInputFormat(MultiTableSnapshotInputFormat.class);
+ if (outputValueClass != null) {
+ job.setMapOutputValueClass(outputValueClass);
+ }
+ if (outputKeyClass != null) {
+ job.setMapOutputKeyClass(outputKeyClass);
+ }
+ job.setMapperClass(mapper);
+ if (addDependencyJars) {
+ addDependencyJars(job);
+ }
+
+ org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job);
+ }
+
+ /**
+ * Sets up the job for reading from a table snapshot. It bypasses hbase servers
+ * and read directly from snapshot files.
+ *
+ * @param snapshotName The name of the snapshot (of a table) to read from.
+ * @param columns The columns to scan.
+ * @param mapper The mapper class to use.
+ * @param outputKeyClass The class of the output key.
+ * @param outputValueClass The class of the output value.
+ * @param job The current job to adjust. Make sure the passed job is
+ * carrying all necessary HBase configuration.
+ * @param addDependencyJars upload HBase jars and jars for any of the configured
+ * job classes via the distributed cache (tmpjars).
+ * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should
+ * have write permissions to this directory, and this should not be a subdirectory of rootdir.
+ * After the job is finished, restore directory can be deleted.
+ * @throws IOException When setting up the details fails.
+ * @see TableSnapshotInputFormat
+ */
+ public static void initTableSnapshotMapJob(String snapshotName, String columns,
+ Class<? extends TableMap> mapper,
+ Class<?> outputKeyClass,
+ Class<?> outputValueClass, JobConf job,
+ boolean addDependencyJars, Path tmpRestoreDir)
+ throws IOException {
+ TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
+ initTableMapJob(snapshotName, columns, mapper, outputKeyClass, outputValueClass, job,
+ addDependencyJars, TableSnapshotInputFormat.class);
+ org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job);
+ }
+
+ /**
+ * Use this before submitting a TableReduce job. It will
+ * appropriately set up the JobConf.
+ *
+ * @param table The output table.
+ * @param reducer The reducer class to use.
+ * @param job The current job configuration to adjust.
+ * @throws IOException When determining the region count fails.
+ */
+ public static void initTableReduceJob(String table,
+ Class<? extends TableReduce> reducer, JobConf job)
+ throws IOException {
+ initTableReduceJob(table, reducer, job, null);
+ }
+
+ /**
+ * Use this before submitting a TableReduce job. It will
+ * appropriately set up the JobConf.
+ *
+ * @param table The output table.
+ * @param reducer The reducer class to use.
+ * @param job The current job configuration to adjust.
+ * @param partitioner Partitioner to use. Pass <code>null</code> to use
+ * default partitioner.
+ * @throws IOException When determining the region count fails.
+ */
+ public static void initTableReduceJob(String table,
+ Class<? extends TableReduce> reducer, JobConf job, Class partitioner)
+ throws IOException {
+ initTableReduceJob(table, reducer, job, partitioner, true);
+ }
+
+ /**
+ * Use this before submitting a TableReduce job. It will
+ * appropriately set up the JobConf.
+ *
+ * @param table The output table.
+ * @param reducer The reducer class to use.
+ * @param job The current job configuration to adjust.
+ * @param partitioner Partitioner to use. Pass <code>null</code> to use
+ * default partitioner.
+ * @param addDependencyJars upload HBase jars and jars for any of the configured
+ * job classes via the distributed cache (tmpjars).
+ * @throws IOException When determining the region count fails.
+ */
+ public static void initTableReduceJob(String table,
+ Class<? extends TableReduce> reducer, JobConf job, Class partitioner,
+ boolean addDependencyJars) throws IOException {
+ job.setOutputFormat(TableOutputFormat.class);
+ job.setReducerClass(reducer);
+ job.set(TableOutputFormat.OUTPUT_TABLE, table);
+ job.setOutputKeyClass(ImmutableBytesWritable.class);
+ job.setOutputValueClass(Put.class);
+ job.setStrings("io.serializations", job.get("io.serializations"),
+ MutationSerialization.class.getName(), ResultSerialization.class.getName());
+ if (partitioner == HRegionPartitioner.class) {
+ job.setPartitionerClass(HRegionPartitioner.class);
+ int regions =
+ MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
+ if (job.getNumReduceTasks() > regions) {
+ job.setNumReduceTasks(regions);
+ }
+ } else if (partitioner != null) {
+ job.setPartitionerClass(partitioner);
+ }
+ if (addDependencyJars) {
+ addDependencyJars(job);
+ }
+ initCredentials(job);
+ }
+
+ public static void initCredentials(JobConf job) throws IOException {
+ UserProvider userProvider = UserProvider.instantiate(job);
+ if (userProvider.isHadoopSecurityEnabled()) {
+ // propagate delegation related props from launcher job to MR job
+ if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
+ job.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
+ }
+ }
+
+ if (userProvider.isHBaseSecurityEnabled()) {
+ Connection conn = ConnectionFactory.createConnection(job);
+ try {
+ // login the server principal (if using secure Hadoop)
+ User user = userProvider.getCurrent();
+ TokenUtil.addTokenForJob(conn, job, user);
+ } catch (InterruptedException ie) {
+ ie.printStackTrace();
+ Thread.currentThread().interrupt();
+ } finally {
+ conn.close();
+ }
+ }
+ }
+
+ /**
+ * Ensures that the given number of reduce tasks for the given job
+ * configuration does not exceed the number of regions for the given table.
+ *
+ * @param table The table to get the region count for.
+ * @param job The current job configuration to adjust.
+ * @throws IOException When retrieving the table details fails.
+ */
+ // Used by tests.
+ public static void limitNumReduceTasks(String table, JobConf job)
+ throws IOException {
+ int regions =
+ MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
+ if (job.getNumReduceTasks() > regions)
+ job.setNumReduceTasks(regions);
+ }
+
+ /**
+ * Ensures that the given number of map tasks for the given job
+ * configuration does not exceed the number of regions for the given table.
+ *
+ * @param table The table to get the region count for.
+ * @param job The current job configuration to adjust.
+ * @throws IOException When retrieving the table details fails.
+ */
+ // Used by tests.
+ public static void limitNumMapTasks(String table, JobConf job)
+ throws IOException {
+ int regions =
+ MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
+ if (job.getNumMapTasks() > regions)
+ job.setNumMapTasks(regions);
+ }
+
+ /**
+ * Sets the number of reduce tasks for the given job configuration to the
+ * number of regions the given table has.
+ *
+ * @param table The table to get the region count for.
+ * @param job The current job configuration to adjust.
+ * @throws IOException When retrieving the table details fails.
+ */
+ public static void setNumReduceTasks(String table, JobConf job)
+ throws IOException {
+ job.setNumReduceTasks(MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job),
+ TableName.valueOf(table)));
+ }
+
+ /**
+ * Sets the number of map tasks for the given job configuration to the
+ * number of regions the given table has.
+ *
+ * @param table The table to get the region count for.
+ * @param job The current job configuration to adjust.
+ * @throws IOException When retrieving the table details fails.
+ */
+ public static void setNumMapTasks(String table, JobConf job)
+ throws IOException {
+ job.setNumMapTasks(MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job),
+ TableName.valueOf(table)));
+ }
+
+ /**
+ * Sets the number of rows to return and cache with each scanner iteration.
+ * Higher caching values will enable faster mapreduce jobs at the expense of
+ * requiring more heap to contain the cached rows.
+ *
+ * @param job The current job configuration to adjust.
+ * @param batchSize The number of rows to return in batch with each scanner
+ * iteration.
+ */
+ public static void setScannerCaching(JobConf job, int batchSize) {
+ job.setInt("hbase.client.scanner.caching", batchSize);
+ }
+
+ /**
+ * @see org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#addDependencyJars(org.apache.hadoop.mapreduce.Job)
+ */
+ public static void addDependencyJars(JobConf job) throws IOException {
+ org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addHBaseDependencyJars(job);
+ org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJarsForClasses(
+ job,
+ // when making changes here, consider also mapreduce.TableMapReduceUtil
+ // pull job classes
+ job.getMapOutputKeyClass(),
+ job.getMapOutputValueClass(),
+ job.getOutputKeyClass(),
+ job.getOutputValueClass(),
+ job.getPartitionerClass(),
+ job.getClass("mapred.input.format.class", TextInputFormat.class, InputFormat.class),
+ job.getClass("mapred.output.format.class", TextOutputFormat.class, OutputFormat.class),
+ job.getCombinerClass());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java
new file mode 100644
index 0000000..06b28ed
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java
@@ -0,0 +1,134 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.InvalidJobConfException;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Convert Map/Reduce output and write it to an HBase table
+ */
+@InterfaceAudience.Public
+public class TableOutputFormat extends FileOutputFormat<ImmutableBytesWritable, Put> {
+
+ /** JobConf parameter that specifies the output table */
+ public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
+
+ /**
+ * Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable)
+ * and write to an HBase table.
+ */
+ protected static class TableRecordWriter implements RecordWriter<ImmutableBytesWritable, Put> {
+ private BufferedMutator m_mutator;
+ private Connection conn;
+
+
+ /**
+ * Instantiate a TableRecordWriter with the HBase HClient for writing.
+ *
+ * @deprecated Please use {@code #TableRecordWriter(JobConf)} This version does not clean up
+ * connections and will leak connections (removed in 2.0)
+ */
+ @Deprecated
+ public TableRecordWriter(final BufferedMutator mutator) throws IOException {
+ this.m_mutator = mutator;
+ this.conn = null;
+ }
+
+ /**
+ * Instantiate a TableRecordWriter with a BufferedMutator for batch writing.
+ */
+ public TableRecordWriter(JobConf job) throws IOException {
+ // expecting exactly one path
+ TableName tableName = TableName.valueOf(job.get(OUTPUT_TABLE));
+ try {
+ this.conn = ConnectionFactory.createConnection(job);
+ this.m_mutator = conn.getBufferedMutator(tableName);
+ } finally {
+ if (this.m_mutator == null) {
+ conn.close();
+ conn = null;
+ }
+ }
+ }
+
+ public void close(Reporter reporter) throws IOException {
+ try {
+ if (this.m_mutator != null) {
+ this.m_mutator.close();
+ }
+ } finally {
+ if (conn != null) {
+ this.conn.close();
+ }
+ }
+ }
+
+ public void write(ImmutableBytesWritable key, Put value) throws IOException {
+ m_mutator.mutate(new Put(value));
+ }
+ }
+
+ /**
+ * Creates a new record writer.
+ *
+ * Be aware that the baseline javadoc gives the impression that there is a single
+ * {@link RecordWriter} per job but in HBase, it is more natural if we give you a new
+ * RecordWriter per call of this method. You must close the returned RecordWriter when done.
+ * Failure to do so will drop writes.
+ *
+ * @param ignored Ignored filesystem
+ * @param job Current JobConf
+ * @param name Name of the job
+ * @param progress
+ * @return The newly created writer instance.
+ * @throws IOException When creating the writer fails.
+ */
+ @Override
+ public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name,
+ Progressable progress)
+ throws IOException {
+ // Clear write buffer on fail is true by default so no need to reset it.
+ return new TableRecordWriter(job);
+ }
+
+ @Override
+ public void checkOutputSpecs(FileSystem ignored, JobConf job)
+ throws FileAlreadyExistsException, InvalidJobConfException, IOException {
+ String tableName = job.get(OUTPUT_TABLE);
+ if (tableName == null) {
+ throw new IOException("Must specify table name");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java
new file mode 100644
index 0000000..cecef7d
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java
@@ -0,0 +1,139 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapred.RecordReader;
+
+
+/**
+ * Iterate over an HBase table data, return (Text, RowResult) pairs
+ */
+@InterfaceAudience.Public
+public class TableRecordReader
+implements RecordReader<ImmutableBytesWritable, Result> {
+
+ private TableRecordReaderImpl recordReaderImpl = new TableRecordReaderImpl();
+
+ /**
+ * Restart from survivable exceptions by creating a new scanner.
+ *
+ * @param firstRow
+ * @throws IOException
+ */
+ public void restart(byte[] firstRow) throws IOException {
+ this.recordReaderImpl.restart(firstRow);
+ }
+
+ /**
+ * Build the scanner. Not done in constructor to allow for extension.
+ *
+ * @throws IOException
+ */
+ public void init() throws IOException {
+ this.recordReaderImpl.restart(this.recordReaderImpl.getStartRow());
+ }
+
+ /**
+ * @param htable the {@link org.apache.hadoop.hbase.HTableDescriptor} to scan.
+ */
+ public void setHTable(Table htable) {
+ this.recordReaderImpl.setHTable(htable);
+ }
+
+ /**
+ * @param inputColumns the columns to be placed in {@link Result}.
+ */
+ public void setInputColumns(final byte [][] inputColumns) {
+ this.recordReaderImpl.setInputColumns(inputColumns);
+ }
+
+ /**
+ * @param startRow the first row in the split
+ */
+ public void setStartRow(final byte [] startRow) {
+ this.recordReaderImpl.setStartRow(startRow);
+ }
+
+ /**
+ *
+ * @param endRow the last row in the split
+ */
+ public void setEndRow(final byte [] endRow) {
+ this.recordReaderImpl.setEndRow(endRow);
+ }
+
+ /**
+ * @param rowFilter the {@link Filter} to be used.
+ */
+ public void setRowFilter(Filter rowFilter) {
+ this.recordReaderImpl.setRowFilter(rowFilter);
+ }
+
+ public void close() {
+ this.recordReaderImpl.close();
+ }
+
+ /**
+ * @return ImmutableBytesWritable
+ *
+ * @see org.apache.hadoop.mapred.RecordReader#createKey()
+ */
+ public ImmutableBytesWritable createKey() {
+ return this.recordReaderImpl.createKey();
+ }
+
+ /**
+ * @return RowResult
+ *
+ * @see org.apache.hadoop.mapred.RecordReader#createValue()
+ */
+ public Result createValue() {
+ return this.recordReaderImpl.createValue();
+ }
+
+ public long getPos() {
+
+ // This should be the ordinal tuple in the range;
+ // not clear how to calculate...
+ return this.recordReaderImpl.getPos();
+ }
+
+ public float getProgress() {
+ // Depends on the total number of tuples and getPos
+ return this.recordReaderImpl.getPos();
+ }
+
+ /**
+ * @param key HStoreKey as input key.
+ * @param value MapWritable as input value
+ * @return true if there was more data
+ * @throws IOException
+ */
+ public boolean next(ImmutableBytesWritable key, Result value)
+ throws IOException {
+ return this.recordReaderImpl.next(key, value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java
new file mode 100644
index 0000000..f6b79c3
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java
@@ -0,0 +1,259 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.ScannerCallable;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.StringUtils;
+
+import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT;
+
+/**
+ * Iterate over an HBase table data, return (Text, RowResult) pairs
+ */
+@InterfaceAudience.Public
+public class TableRecordReaderImpl {
+ private static final Log LOG = LogFactory.getLog(TableRecordReaderImpl.class);
+
+ private byte [] startRow;
+ private byte [] endRow;
+ private byte [] lastSuccessfulRow;
+ private Filter trrRowFilter;
+ private ResultScanner scanner;
+ private Table htable;
+ private byte [][] trrInputColumns;
+ private long timestamp;
+ private int rowcount;
+ private boolean logScannerActivity = false;
+ private int logPerRowCount = 100;
+
+ /**
+ * Restart from survivable exceptions by creating a new scanner.
+ *
+ * @param firstRow
+ * @throws IOException
+ */
+ public void restart(byte[] firstRow) throws IOException {
+ Scan currentScan;
+ if ((endRow != null) && (endRow.length > 0)) {
+ if (trrRowFilter != null) {
+ Scan scan = new Scan(firstRow, endRow);
+ TableInputFormat.addColumns(scan, trrInputColumns);
+ scan.setFilter(trrRowFilter);
+ scan.setCacheBlocks(false);
+ this.scanner = this.htable.getScanner(scan);
+ currentScan = scan;
+ } else {
+ LOG.debug("TIFB.restart, firstRow: " +
+ Bytes.toStringBinary(firstRow) + ", endRow: " +
+ Bytes.toStringBinary(endRow));
+ Scan scan = new Scan(firstRow, endRow);
+ TableInputFormat.addColumns(scan, trrInputColumns);
+ this.scanner = this.htable.getScanner(scan);
+ currentScan = scan;
+ }
+ } else {
+ LOG.debug("TIFB.restart, firstRow: " +
+ Bytes.toStringBinary(firstRow) + ", no endRow");
+
+ Scan scan = new Scan(firstRow);
+ TableInputFormat.addColumns(scan, trrInputColumns);
+ scan.setFilter(trrRowFilter);
+ this.scanner = this.htable.getScanner(scan);
+ currentScan = scan;
+ }
+ if (logScannerActivity) {
+ LOG.info("Current scan=" + currentScan.toString());
+ timestamp = System.currentTimeMillis();
+ rowcount = 0;
+ }
+ }
+
+ /**
+ * Build the scanner. Not done in constructor to allow for extension.
+ *
+ * @throws IOException
+ */
+ public void init() throws IOException {
+ restart(startRow);
+ }
+
+ byte[] getStartRow() {
+ return this.startRow;
+ }
+ /**
+ * @param htable the {@link org.apache.hadoop.hbase.HTableDescriptor} to scan.
+ */
+ public void setHTable(Table htable) {
+ Configuration conf = htable.getConfiguration();
+ logScannerActivity = conf.getBoolean(
+ ScannerCallable.LOG_SCANNER_ACTIVITY, false);
+ logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
+ this.htable = htable;
+ }
+
+ /**
+ * @param inputColumns the columns to be placed in {@link Result}.
+ */
+ public void setInputColumns(final byte [][] inputColumns) {
+ this.trrInputColumns = inputColumns;
+ }
+
+ /**
+ * @param startRow the first row in the split
+ */
+ public void setStartRow(final byte [] startRow) {
+ this.startRow = startRow;
+ }
+
+ /**
+ *
+ * @param endRow the last row in the split
+ */
+ public void setEndRow(final byte [] endRow) {
+ this.endRow = endRow;
+ }
+
+ /**
+ * @param rowFilter the {@link Filter} to be used.
+ */
+ public void setRowFilter(Filter rowFilter) {
+ this.trrRowFilter = rowFilter;
+ }
+
+ public void close() {
+ if (this.scanner != null) {
+ this.scanner.close();
+ }
+ try {
+ this.htable.close();
+ } catch (IOException ioe) {
+ LOG.warn("Error closing table", ioe);
+ }
+ }
+
+ /**
+ * @return ImmutableBytesWritable
+ *
+ * @see org.apache.hadoop.mapred.RecordReader#createKey()
+ */
+ public ImmutableBytesWritable createKey() {
+ return new ImmutableBytesWritable();
+ }
+
+ /**
+ * @return RowResult
+ *
+ * @see org.apache.hadoop.mapred.RecordReader#createValue()
+ */
+ public Result createValue() {
+ return new Result();
+ }
+
+ public long getPos() {
+ // This should be the ordinal tuple in the range;
+ // not clear how to calculate...
+ return 0;
+ }
+
+ public float getProgress() {
+ // Depends on the total number of tuples and getPos
+ return 0;
+ }
+
+ /**
+ * @param key HStoreKey as input key.
+ * @param value MapWritable as input value
+ * @return true if there was more data
+ * @throws IOException
+ */
+ public boolean next(ImmutableBytesWritable key, Result value)
+ throws IOException {
+ Result result;
+ try {
+ try {
+ result = this.scanner.next();
+ if (logScannerActivity) {
+ rowcount ++;
+ if (rowcount >= logPerRowCount) {
+ long now = System.currentTimeMillis();
+ LOG.info("Mapper took " + (now-timestamp)
+ + "ms to process " + rowcount + " rows");
+ timestamp = now;
+ rowcount = 0;
+ }
+ }
+ } catch (IOException e) {
+ // do not retry if the exception tells us not to do so
+ if (e instanceof DoNotRetryIOException) {
+ throw e;
+ }
+ // try to handle all other IOExceptions by restarting
+ // the scanner, if the second call fails, it will be rethrown
+ LOG.debug("recovered from " + StringUtils.stringifyException(e));
+ if (lastSuccessfulRow == null) {
+ LOG.warn("We are restarting the first next() invocation," +
+ " if your mapper has restarted a few other times like this" +
+ " then you should consider killing this job and investigate" +
+ " why it's taking so long.");
+ }
+ if (lastSuccessfulRow == null) {
+ restart(startRow);
+ } else {
+ restart(lastSuccessfulRow);
+ this.scanner.next(); // skip presumed already mapped row
+ }
+ result = this.scanner.next();
+ }
+
+ if (result != null && result.size() > 0) {
+ key.set(result.getRow());
+ lastSuccessfulRow = key.get();
+ value.copyFrom(result);
+ return true;
+ }
+ return false;
+ } catch (IOException ioe) {
+ if (logScannerActivity) {
+ long now = System.currentTimeMillis();
+ LOG.info("Mapper took " + (now-timestamp)
+ + "ms to process " + rowcount + " rows");
+ LOG.info(ioe);
+ String lastRow = lastSuccessfulRow == null ?
+ "null" : Bytes.toStringBinary(lastSuccessfulRow);
+ LOG.info("lastSuccessfulRow=" + lastRow);
+ }
+ throw ioe;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableReduce.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableReduce.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableReduce.java
new file mode 100644
index 0000000..91fb4a1
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableReduce.java
@@ -0,0 +1,38 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.Reducer;
+
+/**
+ * Write a table, sorting by the input key
+ *
+ * @param <K> key class
+ * @param <V> value class
+ */
+@InterfaceAudience.Public
+@SuppressWarnings("unchecked")
+public interface TableReduce<K extends WritableComparable, V>
+extends Reducer<K, V, ImmutableBytesWritable, Put> {
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java
new file mode 100644
index 0000000..d7b49ff
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapred;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * TableSnapshotInputFormat allows a MapReduce job to run over a table snapshot. Further
+ * documentation available on {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat}.
+ *
+ * @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat
+ */
+@InterfaceAudience.Public
+public class TableSnapshotInputFormat implements InputFormat<ImmutableBytesWritable, Result> {
+
+ public static class TableSnapshotRegionSplit implements InputSplit {
+ private TableSnapshotInputFormatImpl.InputSplit delegate;
+
+ // constructor for mapreduce framework / Writable
+ public TableSnapshotRegionSplit() {
+ this.delegate = new TableSnapshotInputFormatImpl.InputSplit();
+ }
+
+ public TableSnapshotRegionSplit(TableSnapshotInputFormatImpl.InputSplit delegate) {
+ this.delegate = delegate;
+ }
+
+ public TableSnapshotRegionSplit(HTableDescriptor htd, HRegionInfo regionInfo,
+ List<String> locations, Scan scan, Path restoreDir) {
+ this.delegate =
+ new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations, scan, restoreDir);
+ }
+
+ @Override
+ public long getLength() throws IOException {
+ return delegate.getLength();
+ }
+
+ @Override
+ public String[] getLocations() throws IOException {
+ return delegate.getLocations();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ delegate.write(out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ delegate.readFields(in);
+ }
+ }
+
+ static class TableSnapshotRecordReader
+ implements RecordReader<ImmutableBytesWritable, Result> {
+
+ private TableSnapshotInputFormatImpl.RecordReader delegate;
+
+ public TableSnapshotRecordReader(TableSnapshotRegionSplit split, JobConf job)
+ throws IOException {
+ delegate = new TableSnapshotInputFormatImpl.RecordReader();
+ delegate.initialize(split.delegate, job);
+ }
+
+ @Override
+ public boolean next(ImmutableBytesWritable key, Result value) throws IOException {
+ if (!delegate.nextKeyValue()) {
+ return false;
+ }
+ ImmutableBytesWritable currentKey = delegate.getCurrentKey();
+ key.set(currentKey.get(), currentKey.getOffset(), currentKey.getLength());
+ value.copyFrom(delegate.getCurrentValue());
+ return true;
+ }
+
+ @Override
+ public ImmutableBytesWritable createKey() {
+ return new ImmutableBytesWritable();
+ }
+
+ @Override
+ public Result createValue() {
+ return new Result();
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return delegate.getPos();
+ }
+
+ @Override
+ public void close() throws IOException {
+ delegate.close();
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return delegate.getProgress();
+ }
+ }
+
+ @Override
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ List<TableSnapshotInputFormatImpl.InputSplit> splits =
+ TableSnapshotInputFormatImpl.getSplits(job);
+ InputSplit[] results = new InputSplit[splits.size()];
+ for (int i = 0; i < splits.size(); i++) {
+ results[i] = new TableSnapshotRegionSplit(splits.get(i));
+ }
+ return results;
+ }
+
+ @Override
+ public RecordReader<ImmutableBytesWritable, Result>
+ getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
+ return new TableSnapshotRecordReader((TableSnapshotRegionSplit) split, job);
+ }
+
+ /**
+ * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
+ * @param job the job to configure
+ * @param snapshotName the name of the snapshot to read from
+ * @param restoreDir a temporary directory to restore the snapshot into. Current user should
+ * have write permissions to this directory, and this should not be a subdirectory of rootdir.
+ * After the job is finished, restoreDir can be deleted.
+ * @throws IOException if an error occurs
+ */
+ public static void setInput(JobConf job, String snapshotName, Path restoreDir)
+ throws IOException {
+ TableSnapshotInputFormatImpl.setInput(job, snapshotName, restoreDir);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableSplit.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableSplit.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableSplit.java
new file mode 100644
index 0000000..0784e5e
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableSplit.java
@@ -0,0 +1,154 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.InputSplit;
+
+/**
+ * A table split corresponds to a key range [low, high)
+ */
+@InterfaceAudience.Public
+public class TableSplit implements InputSplit, Comparable<TableSplit> {
+ private TableName m_tableName;
+ private byte [] m_startRow;
+ private byte [] m_endRow;
+ private String m_regionLocation;
+
+ /** default constructor */
+ public TableSplit() {
+ this((TableName)null, HConstants.EMPTY_BYTE_ARRAY,
+ HConstants.EMPTY_BYTE_ARRAY, "");
+ }
+
+ /**
+ * Constructor
+ * @param tableName
+ * @param startRow
+ * @param endRow
+ * @param location
+ */
+ public TableSplit(TableName tableName, byte [] startRow, byte [] endRow,
+ final String location) {
+ this.m_tableName = tableName;
+ this.m_startRow = startRow;
+ this.m_endRow = endRow;
+ this.m_regionLocation = location;
+ }
+
+ public TableSplit(byte [] tableName, byte [] startRow, byte [] endRow,
+ final String location) {
+ this(TableName.valueOf(tableName), startRow, endRow,
+ location);
+ }
+
+ /** @return table name */
+ public TableName getTable() {
+ return this.m_tableName;
+ }
+
+ /** @return table name */
+ public byte [] getTableName() {
+ return this.m_tableName.getName();
+ }
+
+ /** @return starting row key */
+ public byte [] getStartRow() {
+ return this.m_startRow;
+ }
+
+ /** @return end row key */
+ public byte [] getEndRow() {
+ return this.m_endRow;
+ }
+
+ /** @return the region's hostname */
+ public String getRegionLocation() {
+ return this.m_regionLocation;
+ }
+
+ public String[] getLocations() {
+ return new String[] {this.m_regionLocation};
+ }
+
+ public long getLength() {
+ // Not clear how to obtain this... seems to be used only for sorting splits
+ return 0;
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ this.m_tableName = TableName.valueOf(Bytes.readByteArray(in));
+ this.m_startRow = Bytes.readByteArray(in);
+ this.m_endRow = Bytes.readByteArray(in);
+ this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in));
+ }
+
+ public void write(DataOutput out) throws IOException {
+ Bytes.writeByteArray(out, this.m_tableName.getName());
+ Bytes.writeByteArray(out, this.m_startRow);
+ Bytes.writeByteArray(out, this.m_endRow);
+ Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation));
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("HBase table split(");
+ sb.append("table name: ").append(m_tableName);
+ sb.append(", start row: ").append(Bytes.toStringBinary(m_startRow));
+ sb.append(", end row: ").append(Bytes.toStringBinary(m_endRow));
+ sb.append(", region location: ").append(m_regionLocation);
+ sb.append(")");
+ return sb.toString();
+ }
+
+ @Override
+ public int compareTo(TableSplit o) {
+ return Bytes.compareTo(getStartRow(), o.getStartRow());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || !(o instanceof TableSplit)) {
+ return false;
+ }
+ TableSplit other = (TableSplit)o;
+ return m_tableName.equals(other.m_tableName) &&
+ Bytes.equals(m_startRow, other.m_startRow) &&
+ Bytes.equals(m_endRow, other.m_endRow) &&
+ m_regionLocation.equals(other.m_regionLocation);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = m_tableName != null ? m_tableName.hashCode() : 0;
+ result = 31 * result + Arrays.hashCode(m_startRow);
+ result = 31 * result + Arrays.hashCode(m_endRow);
+ result = 31 * result + (m_regionLocation != null ? m_regionLocation.hashCode() : 0);
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/package-info.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/package-info.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/package-info.java
new file mode 100644
index 0000000..1da3a52
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/package-info.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+Provides HBase <a href="http://wiki.apache.org/hadoop/HadoopMapReduce">MapReduce</a>
+Input/OutputFormats, a table indexing MapReduce job, and utility methods.
+
+<p>See <a href="http://hbase.apache.org/book.html#mapreduce">HBase and MapReduce</a>
+in the HBase Reference Guide for mapreduce over hbase documentation.
+*/
+package org.apache.hadoop.hbase.mapred;
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java
new file mode 100644
index 0000000..078033e
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java
@@ -0,0 +1,333 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.PrefixFilter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
+
+
+/**
+ * A job with a a map and reduce phase to count cells in a table.
+ * The counter lists the following stats for a given table:
+ * <pre>
+ * 1. Total number of rows in the table
+ * 2. Total number of CFs across all rows
+ * 3. Total qualifiers across all rows
+ * 4. Total occurrence of each CF
+ * 5. Total occurrence of each qualifier
+ * 6. Total number of versions of each qualifier.
+ * </pre>
+ *
+ * The cellcounter can take optional parameters to use a user
+ * supplied row/family/qualifier string to use in the report and
+ * second a regex based or prefix based row filter to restrict the
+ * count operation to a limited subset of rows from the table or a
+ * start time and/or end time to limit the count to a time range.
+ */
+@InterfaceAudience.Public
+public class CellCounter extends Configured implements Tool {
+ private static final Log LOG =
+ LogFactory.getLog(CellCounter.class.getName());
+
+
+ /**
+ * Name of this 'program'.
+ */
+ static final String NAME = "CellCounter";
+
+ private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
+
+ /**
+ * Mapper that runs the count.
+ */
+ static class CellCounterMapper
+ extends TableMapper<Text, IntWritable> {
+ /**
+ * Counter enumeration to count the actual rows.
+ */
+ public static enum Counters {
+ ROWS,
+ CELLS
+ }
+
+ private Configuration conf;
+ private String separator;
+
+ // state of current row, family, column needs to persist across map() invocations
+ // in order to properly handle scanner batching, where a single qualifier may have too
+ // many versions for a single map() call
+ private byte[] lastRow;
+ private String currentRowKey;
+ byte[] currentFamily = null;
+ String currentFamilyName = null;
+ byte[] currentQualifier = null;
+ // family + qualifier
+ String currentQualifierName = null;
+ // rowkey + family + qualifier
+ String currentRowQualifierName = null;
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ conf = context.getConfiguration();
+ separator = conf.get("ReportSeparator",":");
+ }
+
+ /**
+ * Maps the data.
+ *
+ * @param row The current table row key.
+ * @param values The columns.
+ * @param context The current context.
+ * @throws IOException When something is broken with the data.
+ * @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN,
+ * org.apache.hadoop.mapreduce.Mapper.Context)
+ */
+
+ @Override
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
+ justification="Findbugs is blind to the Precondition null check")
+ public void map(ImmutableBytesWritable row, Result values,
+ Context context)
+ throws IOException {
+ Preconditions.checkState(values != null,
+ "values passed to the map is null");
+
+ try {
+ byte[] currentRow = values.getRow();
+ if (lastRow == null || !Bytes.equals(lastRow, currentRow)) {
+ lastRow = currentRow;
+ currentRowKey = Bytes.toStringBinary(currentRow);
+ currentFamily = null;
+ currentQualifier = null;
+ context.getCounter(Counters.ROWS).increment(1);
+ context.write(new Text("Total ROWS"), new IntWritable(1));
+ }
+ if (!values.isEmpty()) {
+ int cellCount = 0;
+ for (Cell value : values.listCells()) {
+ cellCount++;
+ if (currentFamily == null || !CellUtil.matchingFamily(value, currentFamily)) {
+ currentFamily = CellUtil.cloneFamily(value);
+ currentFamilyName = Bytes.toStringBinary(currentFamily);
+ currentQualifier = null;
+ context.getCounter("CF", currentFamilyName).increment(1);
+ if (1 == context.getCounter("CF", currentFamilyName).getValue()) {
+ context.write(new Text("Total Families Across all Rows"), new IntWritable(1));
+ context.write(new Text(currentFamily), new IntWritable(1));
+ }
+ }
+ if (currentQualifier == null || !CellUtil.matchingQualifier(value, currentQualifier)) {
+ currentQualifier = CellUtil.cloneQualifier(value);
+ currentQualifierName = currentFamilyName + separator +
+ Bytes.toStringBinary(currentQualifier);
+ currentRowQualifierName = currentRowKey + separator + currentQualifierName;
+
+ context.write(new Text("Total Qualifiers across all Rows"),
+ new IntWritable(1));
+ context.write(new Text(currentQualifierName), new IntWritable(1));
+ }
+ // Increment versions
+ context.write(new Text(currentRowQualifierName + "_Versions"), new IntWritable(1));
+ }
+ context.getCounter(Counters.CELLS).increment(cellCount);
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ static class IntSumReducer<Key> extends Reducer<Key, IntWritable,
+ Key, IntWritable> {
+
+ private IntWritable result = new IntWritable();
+ public void reduce(Key key, Iterable<IntWritable> values,
+ Context context)
+ throws IOException, InterruptedException {
+ int sum = 0;
+ for (IntWritable val : values) {
+ sum += val.get();
+ }
+ result.set(sum);
+ context.write(key, result);
+ }
+ }
+
+ /**
+ * Sets up the actual job.
+ *
+ * @param conf The current configuration.
+ * @param args The command line parameters.
+ * @return The newly created job.
+ * @throws IOException When setting up the job fails.
+ */
+ public static Job createSubmittableJob(Configuration conf, String[] args)
+ throws IOException {
+ String tableName = args[0];
+ Path outputDir = new Path(args[1]);
+ String reportSeparatorString = (args.length > 2) ? args[2]: ":";
+ conf.set("ReportSeparator", reportSeparatorString);
+ Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
+ job.setJarByClass(CellCounter.class);
+ Scan scan = getConfiguredScanForJob(conf, args);
+ TableMapReduceUtil.initTableMapperJob(tableName, scan,
+ CellCounterMapper.class, ImmutableBytesWritable.class, Result.class, job);
+ job.setNumReduceTasks(1);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(IntWritable.class);
+ job.setOutputFormatClass(TextOutputFormat.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(IntWritable.class);
+ FileOutputFormat.setOutputPath(job, outputDir);
+ job.setReducerClass(IntSumReducer.class);
+ return job;
+ }
+
+ private static Scan getConfiguredScanForJob(Configuration conf, String[] args)
+ throws IOException {
+ // create scan with any properties set from TableInputFormat
+ Scan s = TableInputFormat.createScanFromConfiguration(conf);
+ // Set Scan Versions
+ if (conf.get(TableInputFormat.SCAN_MAXVERSIONS) == null) {
+ // default to all versions unless explicitly set
+ s.setMaxVersions(Integer.MAX_VALUE);
+ }
+ s.setCacheBlocks(false);
+ // Set RowFilter or Prefix Filter if applicable.
+ Filter rowFilter = getRowFilter(args);
+ if (rowFilter!= null) {
+ LOG.info("Setting Row Filter for counter.");
+ s.setFilter(rowFilter);
+ }
+ // Set TimeRange if defined
+ long timeRange[] = getTimeRange(args);
+ if (timeRange != null) {
+ LOG.info("Setting TimeRange for counter.");
+ s.setTimeRange(timeRange[0], timeRange[1]);
+ }
+ return s;
+ }
+
+
+ private static Filter getRowFilter(String[] args) {
+ Filter rowFilter = null;
+ String filterCriteria = (args.length > 3) ? args[3]: null;
+ if (filterCriteria == null) return null;
+ if (filterCriteria.startsWith("^")) {
+ String regexPattern = filterCriteria.substring(1, filterCriteria.length());
+ rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regexPattern));
+ } else {
+ rowFilter = new PrefixFilter(Bytes.toBytesBinary(filterCriteria));
+ }
+ return rowFilter;
+ }
+
+ private static long[] getTimeRange(String[] args) throws IOException {
+ final String startTimeArgKey = "--starttime=";
+ final String endTimeArgKey = "--endtime=";
+ long startTime = 0L;
+ long endTime = 0L;
+
+ for (int i = 1; i < args.length; i++) {
+ System.out.println("i:" + i + "arg[i]" + args[i]);
+ if (args[i].startsWith(startTimeArgKey)) {
+ startTime = Long.parseLong(args[i].substring(startTimeArgKey.length()));
+ }
+ if (args[i].startsWith(endTimeArgKey)) {
+ endTime = Long.parseLong(args[i].substring(endTimeArgKey.length()));
+ }
+ }
+
+ if (startTime == 0 && endTime == 0)
+ return null;
+
+ endTime = endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime;
+ return new long [] {startTime, endTime};
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ if (args.length < 2) {
+ System.err.println("ERROR: Wrong number of parameters: " + args.length);
+ System.err.println("Usage: CellCounter ");
+ System.err.println(" <tablename> <outputDir> <reportSeparator> [^[regex pattern] or " +
+ "[Prefix] for row filter]] --starttime=[starttime] --endtime=[endtime]");
+ System.err.println(" Note: -D properties will be applied to the conf used. ");
+ System.err.println(" Additionally, all of the SCAN properties from TableInputFormat");
+ System.err.println(" can be specified to get fine grained control on what is counted..");
+ System.err.println(" -D " + TableInputFormat.SCAN_ROW_START + "=<rowkey>");
+ System.err.println(" -D " + TableInputFormat.SCAN_ROW_STOP + "=<rowkey>");
+ System.err.println(" -D " + TableInputFormat.SCAN_COLUMNS + "=\"<col1> <col2>...\"");
+ System.err.println(" -D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=<family1>,<family2>, ...");
+ System.err.println(" -D " + TableInputFormat.SCAN_TIMESTAMP + "=<timestamp>");
+ System.err.println(" -D " + TableInputFormat.SCAN_TIMERANGE_START + "=<timestamp>");
+ System.err.println(" -D " + TableInputFormat.SCAN_TIMERANGE_END + "=<timestamp>");
+ System.err.println(" -D " + TableInputFormat.SCAN_MAXVERSIONS + "=<count>");
+ System.err.println(" -D " + TableInputFormat.SCAN_CACHEDROWS + "=<count>");
+ System.err.println(" -D " + TableInputFormat.SCAN_BATCHSIZE + "=<count>");
+ System.err.println(" <reportSeparator> parameter can be used to override the default report separator " +
+ "string : used to separate the rowId/column family name and qualifier name.");
+ System.err.println(" [^[regex pattern] or [Prefix] parameter can be used to limit the cell counter count " +
+ "operation to a limited subset of rows from the table based on regex or prefix pattern.");
+ return -1;
+ }
+ Job job = createSubmittableJob(getConf(), args);
+ return (job.waitForCompletion(true) ? 0 : 1);
+ }
+
+ /**
+ * Main entry point.
+ * @param args The command line parameters.
+ * @throws Exception When running the job fails.
+ */
+ public static void main(String[] args) throws Exception {
+ int errCode = ToolRunner.run(HBaseConfiguration.create(), new CellCounter(), args);
+ System.exit(errCode);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java
new file mode 100644
index 0000000..1d4d37b
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * Facade to create Cells for HFileOutputFormat. The created Cells are of <code>Put</code> type.
+ */
+@InterfaceAudience.Public
+public class CellCreator {
+
+ public static final String VISIBILITY_EXP_RESOLVER_CLASS =
+ "hbase.mapreduce.visibility.expression.resolver.class";
+
+ private VisibilityExpressionResolver visExpResolver;
+
+ public CellCreator(Configuration conf) {
+ Class<? extends VisibilityExpressionResolver> clazz = conf.getClass(
+ VISIBILITY_EXP_RESOLVER_CLASS, DefaultVisibilityExpressionResolver.class,
+ VisibilityExpressionResolver.class);
+ this.visExpResolver = ReflectionUtils.newInstance(clazz, conf);
+ this.visExpResolver.init();
+ }
+
+ /**
+ * @param row row key
+ * @param roffset row offset
+ * @param rlength row length
+ * @param family family name
+ * @param foffset family offset
+ * @param flength family length
+ * @param qualifier column qualifier
+ * @param qoffset qualifier offset
+ * @param qlength qualifier length
+ * @param timestamp version timestamp
+ * @param value column value
+ * @param voffset value offset
+ * @param vlength value length
+ * @return created Cell
+ * @throws IOException
+ */
+ public Cell create(byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength,
+ byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset,
+ int vlength) throws IOException {
+ return create(row, roffset, rlength, family, foffset, flength, qualifier, qoffset, qlength,
+ timestamp, value, voffset, vlength, (List<Tag>)null);
+ }
+
+ /**
+ * @param row row key
+ * @param roffset row offset
+ * @param rlength row length
+ * @param family family name
+ * @param foffset family offset
+ * @param flength family length
+ * @param qualifier column qualifier
+ * @param qoffset qualifier offset
+ * @param qlength qualifier length
+ * @param timestamp version timestamp
+ * @param value column value
+ * @param voffset value offset
+ * @param vlength value length
+ * @param visExpression visibility expression to be associated with cell
+ * @return created Cell
+ * @throws IOException
+ */
+ @Deprecated
+ public Cell create(byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength,
+ byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset,
+ int vlength, String visExpression) throws IOException {
+ List<Tag> visTags = null;
+ if (visExpression != null) {
+ visTags = this.visExpResolver.createVisibilityExpTags(visExpression);
+ }
+ return new KeyValue(row, roffset, rlength, family, foffset, flength, qualifier, qoffset,
+ qlength, timestamp, KeyValue.Type.Put, value, voffset, vlength, visTags);
+ }
+
+ /**
+ * @param row row key
+ * @param roffset row offset
+ * @param rlength row length
+ * @param family family name
+ * @param foffset family offset
+ * @param flength family length
+ * @param qualifier column qualifier
+ * @param qoffset qualifier offset
+ * @param qlength qualifier length
+ * @param timestamp version timestamp
+ * @param value column value
+ * @param voffset value offset
+ * @param vlength value length
+ * @param tags
+ * @return created Cell
+ * @throws IOException
+ */
+ public Cell create(byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength,
+ byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset,
+ int vlength, List<Tag> tags) throws IOException {
+ return new KeyValue(row, roffset, rlength, family, foffset, flength, qualifier, qoffset,
+ qlength, timestamp, KeyValue.Type.Put, value, voffset, vlength, tags);
+ }
+
+ /**
+ * @return Visibility expression resolver
+ */
+ public VisibilityExpressionResolver getVisibilityExpressionResolver() {
+ return this.visExpResolver;
+ }
+}