You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2013/02/02 08:33:27 UTC
svn commit: r1441708 - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/client/
main/java/org/apache/hadoop/hbase/mapreduce/
test/java/org/apache/hadoop/hbase/mapreduce/
Author: larsh
Date: Sat Feb 2 07:33:27 2013
New Revision: 1441708
URL: http://svn.apache.org/viewvc?rev=1441708&view=rev
Log:
HBASE-3996 Support multiple tables and scanners as input to the mapper in map/reduce jobs (Eran Kutner, Bryan Baugher)
Added:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Scan.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Scan.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Scan.java?rev=1441708&r1=1441707&r2=1441708&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Scan.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Scan.java Sat Feb 2 07:33:27 2013
@@ -96,11 +96,14 @@ public class Scan extends OperationWithA
// If application wants to collect scan metrics, it needs to
// call scan.setAttribute(SCAN_ATTRIBUTES_ENABLE, Bytes.toBytes(Boolean.TRUE))
- static public final String SCAN_ATTRIBUTES_METRICS_ENABLE =
- "scan.attributes.metrics.enable";
- static public final String SCAN_ATTRIBUTES_METRICS_DATA =
- "scan.attributes.metrics.data";
-
+ static public final String SCAN_ATTRIBUTES_METRICS_ENABLE = "scan.attributes.metrics.enable";
+ static public final String SCAN_ATTRIBUTES_METRICS_DATA = "scan.attributes.metrics.data";
+
+ // If an application wants to use multiple scans over different tables each scan must
+ // define this attribute with the appropriate table name by calling
+ // scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(tableName))
+ static public final String SCAN_ATTRIBUTES_TABLE_NAME = "scan.attributes.table.name";
+
/*
* -1 means no caching
*/
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java?rev=1441708&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java Sat Feb 2 07:33:27 2013
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+
+/**
+ * Convert HBase tabular data from multiple scanners into a format that
+ * is consumable by Map/Reduce.
+ *
+ * <p>
+ * Usage example
+ * </p>
+ *
+ * <pre>
+ * List<Scan> scans = new ArrayList<Scan>();
+ *
+ * Scan scan1 = new Scan();
+ * scan1.setStartRow(firstRow1);
+ * scan1.setStopRow(lastRow1);
+ * scan1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, table1);
+ * scans.add(scan1);
+ *
+ * Scan scan2 = new Scan();
+ * scan2.setStartRow(firstRow2);
+ * scan2.setStopRow(lastRow2);
+ * scan1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, table2);
+ * scans.add(scan2);
+ *
+ * TableMapReduceUtil.initTableMapperJob(scans, TableMapper.class, Text.class,
+ * IntWritable.class, job);
+ * </pre>
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class MultiTableInputFormat extends MultiTableInputFormatBase implements
+ Configurable {
+
+ /** Job parameter that specifies the scan list. */
+ public static final String SCANS = "hbase.mapreduce.scans";
+
+ /** The configuration. */
+ private Configuration conf = null;
+
+ /**
+ * Returns the current configuration.
+ *
+ * @return The current configuration.
+ * @see org.apache.hadoop.conf.Configurable#getConf()
+ */
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ /**
+ * Sets the configuration. This is used to set the details for the tables to
+ * be scanned.
+ *
+ * @param configuration The configuration to set.
+ * @see org.apache.hadoop.conf.Configurable#setConf(
+ * org.apache.hadoop.conf.Configuration)
+ */
+ @Override
+ public void setConf(Configuration configuration) {
+ this.conf = configuration;
+ String[] rawScans = conf.getStrings(SCANS);
+ if (rawScans.length <= 0) {
+ throw new IllegalArgumentException("There must be at least 1 scan configuration set to : "
+ + SCANS);
+ }
+ List<Scan> scans = new ArrayList<Scan>();
+
+ for (int i = 0; i < rawScans.length; i++) {
+ try {
+ scans.add(TableMapReduceUtil.convertStringToScan(rawScans[i]));
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to convert Scan : " + rawScans[i] + " to string", e);
+ }
+ }
+ this.setScans(scans);
+ }
+}
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java?rev=1441708&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java Sat Feb 2 07:33:27 2013
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * A base for {@link MultiTableInputFormat}s. Receives a list of
+ * {@link Scan} instances that define the input tables and
+ * filters etc. Subclasses may use other TableRecordReader implementations.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public abstract class MultiTableInputFormatBase extends
+ InputFormat<ImmutableBytesWritable, Result> {
+
+ final Log LOG = LogFactory.getLog(MultiTableInputFormatBase.class);
+
+ /** Holds the set of scans used to define the input. */
+ private List<Scan> scans;
+
+ /** The reader scanning the table, can be a custom one. */
+ private TableRecordReader tableRecordReader = null;
+
+ /**
+ * Builds a TableRecordReader. If no TableRecordReader was provided, uses the
+ * default.
+ *
+ * @param split The split to work with.
+ * @param context The current context.
+ * @return The newly created record reader.
+ * @throws IOException When creating the reader fails.
+ * @throws InterruptedException when record reader initialization fails
+ * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader(
+ * org.apache.hadoop.mapreduce.InputSplit,
+ * org.apache.hadoop.mapreduce.TaskAttemptContext)
+ */
+ @Override
+ public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
+ InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ TableSplit tSplit = (TableSplit) split;
+
+ if (tSplit.getTableName() == null) {
+ throw new IOException("Cannot create a record reader because of a"
+ + " previous error. Please look at the previous logs lines from"
+ + " the task's full log for more details.");
+ }
+ HTable table =
+ new HTable(context.getConfiguration(), tSplit.getTableName());
+
+ TableRecordReader trr = this.tableRecordReader;
+ // if no table record reader was provided use default
+ if (trr == null) {
+ trr = new TableRecordReader();
+ }
+ Scan sc = tSplit.getScan();
+ sc.setStartRow(tSplit.getStartRow());
+ sc.setStopRow(tSplit.getEndRow());
+ trr.setScan(sc);
+ trr.setHTable(table);
+ trr.initialize(split, context);
+ return trr;
+ }
+
+ /**
+ * Calculates the splits that will serve as input for the map tasks. The
+ * number of splits matches the number of regions in a table.
+ *
+ * @param context The current job context.
+ * @return The list of input splits.
+ * @throws IOException When creating the list of splits fails.
+ * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext)
+ */
+ @Override
+ public List<InputSplit> getSplits(JobContext context) throws IOException {
+ if (scans.isEmpty()) {
+ throw new IOException("No scans were provided.");
+ }
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+
+ for (Scan scan : scans) {
+ byte[] tableName = scan.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME);
+ if (tableName == null)
+ throw new IOException("A scan object did not have a table name");
+ HTable table = new HTable(context.getConfiguration(), tableName);
+ Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
+ if (keys == null || keys.getFirst() == null ||
+ keys.getFirst().length == 0) {
+ throw new IOException("Expecting at least one region for table : "
+ + Bytes.toString(tableName));
+ }
+ int count = 0;
+
+ byte[] startRow = scan.getStartRow();
+ byte[] stopRow = scan.getStopRow();
+
+ for (int i = 0; i < keys.getFirst().length; i++) {
+ if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
+ continue;
+ }
+ String regionLocation =
+ table.getRegionLocation(keys.getFirst()[i], false).getHostname();
+
+ // determine if the given start and stop keys fall into the range
+ if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
+ Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
+ (stopRow.length == 0 ||
+ Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
+ byte[] splitStart =
+ startRow.length == 0 ||
+ Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? keys
+ .getFirst()[i] : startRow;
+ byte[] splitStop =
+ (stopRow.length == 0 || Bytes.compareTo(keys.getSecond()[i],
+ stopRow) <= 0) && keys.getSecond()[i].length > 0 ? keys
+ .getSecond()[i] : stopRow;
+ InputSplit split =
+ new TableSplit(tableName, scan, splitStart,
+ splitStop, regionLocation);
+ splits.add(split);
+ if (LOG.isDebugEnabled())
+ LOG.debug("getSplits: split -> " + (count++) + " -> " + split);
+ }
+ }
+ table.close();
+ }
+ return splits;
+ }
+
+ /**
+ * Test if the given region is to be included in the InputSplit while
+ * splitting the regions of a table.
+ * <p>
+ * This optimization is effective when there is a specific reasoning to
+ * exclude an entire region from the M-R job, (and hence, not contributing to
+ * the InputSplit), given the start and end keys of the same. <br>
+ * Useful when we need to remember the last-processed top record and revisit
+ * the [last, current) interval for M-R processing, continuously. In addition
+ * to reducing InputSplits, reduces the load on the region server as well, due
+ * to the ordering of the keys. <br>
+ * <br>
+ * Note: It is possible that <code>endKey.length() == 0 </code> , for the last
+ * (recent) region. <br>
+ * Override this method, if you want to bulk exclude regions altogether from
+ * M-R. By default, no region is excluded( i.e. all regions are included).
+ *
+ * @param startKey Start key of the region
+ * @param endKey End key of the region
+ * @return true, if this region needs to be included as part of the input
+ * (default).
+ */
+ protected boolean includeRegionInSplit(final byte[] startKey,
+ final byte[] endKey) {
+ return true;
+ }
+
+ /**
+ * Allows subclasses to get the list of {@link Scan} objects.
+ */
+ protected List<Scan> getScans() {
+ return this.scans;
+ }
+
+ /**
+ * Allows subclasses to set the list of {@link Scan} objects.
+ *
+ * @param scans The list of {@link Scan} used to define the input
+ */
+ protected void setScans(List<Scan> scans) {
+ this.scans = scans;
+ }
+
+ /**
+ * Allows subclasses to set the {@link TableRecordReader}.
+ *
+ * @param tableRecordReader A different {@link TableRecordReader}
+ * implementation.
+ */
+ protected void setTableRecordReader(TableRecordReader tableRecordReader) {
+ this.tableRecordReader = tableRecordReader;
+ }
+}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java?rev=1441708&r1=1441707&r2=1441708&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java Sat Feb 2 07:33:27 2013
@@ -23,8 +23,10 @@ import java.lang.reflect.InvocationTarge
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLDecoder;
+import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log;
@@ -45,6 +47,7 @@ import org.apache.hadoop.hbase.util.Base
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.StringUtils;
@@ -218,6 +221,67 @@ public class TableMapReduceUtil {
initTableMapperJob(table, scan, mapper, outputKeyClass,
outputValueClass, job, addDependencyJars, TableInputFormat.class);
}
+
+ /**
+ * Use this before submitting a Multi TableMap job. It will appropriately set
+ * up the job.
+ *
+ * @param scans The list of {@link Scan} objects to read from.
+ * @param mapper The mapper class to use.
+ * @param outputKeyClass The class of the output key.
+ * @param outputValueClass The class of the output value.
+ * @param job The current job to adjust. Make sure the passed job is carrying
+ * all necessary HBase configuration.
+ * @throws IOException When setting up the details fails.
+ */
+ public static void initTableMapperJob(List<Scan> scans,
+ Class<? extends TableMapper> mapper,
+ Class<? extends WritableComparable> outputKeyClass,
+ Class<? extends Writable> outputValueClass, Job job) throws IOException {
+ initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job,
+ true);
+ }
+
+ /**
+ * Use this before submitting a Multi TableMap job. It will appropriately set
+ * up the job.
+ *
+ * @param scans The list of {@link Scan} objects to read from.
+ * @param mapper The mapper class to use.
+ * @param outputKeyClass The class of the output key.
+ * @param outputValueClass The class of the output value.
+ * @param job The current job to adjust. Make sure the passed job is carrying
+ * all necessary HBase configuration.
+ * @param addDependencyJars upload HBase jars and jars for any of the
+ * configured job classes via the distributed cache (tmpjars).
+ * @throws IOException When setting up the details fails.
+ */
+ public static void initTableMapperJob(List<Scan> scans,
+ Class<? extends TableMapper> mapper,
+ Class<? extends WritableComparable> outputKeyClass,
+ Class<? extends Writable> outputValueClass, Job job,
+ boolean addDependencyJars) throws IOException {
+ job.setInputFormatClass(MultiTableInputFormat.class);
+ if (outputValueClass != null) {
+ job.setMapOutputValueClass(outputValueClass);
+ }
+ if (outputKeyClass != null) {
+ job.setMapOutputKeyClass(outputKeyClass);
+ }
+ job.setMapperClass(mapper);
+ HBaseConfiguration.addHbaseResources(job.getConfiguration());
+ List<String> scanStrings = new ArrayList<String>();
+
+ for (Scan scan : scans) {
+ scanStrings.add(convertScanToString(scan));
+ }
+ job.getConfiguration().setStrings(MultiTableInputFormat.SCANS,
+ scanStrings.toArray(new String[scanStrings.size()]));
+
+ if (addDependencyJars) {
+ addDependencyJars(job);
+ }
+ }
public static void initCredentials(Job job) throws IOException {
if (User.isHBaseSecurityEnabled(job.getConfiguration())) {
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java?rev=1441708&r1=1441707&r2=1441708&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java Sat Feb 2 07:33:27 2013
@@ -23,30 +23,68 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.InputSplit;
/**
- * A table split corresponds to a key range (low, high). All references to row
- * below refer to the key of the row.
+ * A table split corresponds to a key range (low, high) and an optional scanner.
+ * All references to row below refer to the key of the row.
*/
@InterfaceAudience.Public
-@InterfaceStability.Stable
+@InterfaceStability.Evolving
public class TableSplit extends InputSplit
implements Writable, Comparable<TableSplit> {
+ public static final Log LOG = LogFactory.getLog(TableSplit.class);
+
+ // should be < 0 (@see #readFields(DataInput))
+ // version 1 supports Scan data member
+ enum Version {
+ UNVERSIONED(0),
+ // Initial number we put on TableSplit when we introduced versioning.
+ INITIAL(-1);
+
+ final int code;
+ static final Version[] byCode;
+ static {
+ byCode = Version.values();
+ for (int i = 0; i < byCode.length; i++) {
+ if (byCode[i].code != -1 * i) {
+ throw new AssertionError("Values in this enum should be descending by one");
+ }
+ }
+ }
+
+ Version(int code) {
+ this.code = code;
+ }
+
+ boolean atLeast(Version other) {
+ return code <= other.code;
+ }
+ static Version fromCode(int code) {
+ return byCode[code * -1];
+ }
+ }
+
+ private static final Version VERSION = Version.INITIAL;
private byte [] tableName;
private byte [] startRow;
private byte [] endRow;
private String regionLocation;
+ private String scan = ""; // stores the serialized form of the Scan
/** Default constructor. */
public TableSplit() {
- this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY,
+ this(HConstants.EMPTY_BYTE_ARRAY, null, HConstants.EMPTY_BYTE_ARRAY,
HConstants.EMPTY_BYTE_ARRAY, "");
}
@@ -54,17 +92,47 @@ implements Writable, Comparable<TableSpl
* Creates a new instance while assigning all variables.
*
* @param tableName The name of the current table.
+ * @param scan The scan associated with this split.
* @param startRow The start row of the split.
* @param endRow The end row of the split.
* @param location The location of the region.
*/
- public TableSplit(byte [] tableName, byte [] startRow, byte [] endRow,
+ public TableSplit(byte [] tableName, Scan scan, byte [] startRow, byte [] endRow,
final String location) {
this.tableName = tableName;
+ try {
+ this.scan =
+ (null == scan) ? "" : TableMapReduceUtil.convertScanToString(scan);
+ } catch (IOException e) {
+ LOG.warn("Failed to convert Scan to String", e);
+ }
this.startRow = startRow;
this.endRow = endRow;
this.regionLocation = location;
}
+
+ /**
+ * Creates a new instance without a scanner.
+ *
+ * @param tableName The name of the current table.
+ * @param startRow The start row of the split.
+ * @param endRow The end row of the split.
+ * @param location The location of the region.
+ */
+ public TableSplit(byte[] tableName, byte[] startRow, byte[] endRow,
+ final String location) {
+ this(tableName, null, startRow, endRow, location);
+ }
+
+ /**
+ * Returns a Scan object from the stored string representation.
+ *
+ * @return Returns a Scan object based on the stored scanner.
+ * @throws IOException
+ */
+ public Scan getScan() throws IOException {
+ return TableMapReduceUtil.convertStringToScan(this.scan);
+ }
/**
* Returns the table name.
@@ -133,10 +201,29 @@ implements Writable, Comparable<TableSpl
*/
@Override
public void readFields(DataInput in) throws IOException {
- tableName = Bytes.readByteArray(in);
+ Version version = Version.UNVERSIONED;
+ // TableSplit was not versioned in the beginning.
+ // In order to introduce it now, we make use of the fact
+ // that tableName was written with Bytes.writeByteArray,
+ // which encodes the array length as a vint which is >= 0.
+ // Hence if the vint is >= 0 we have an old version and the vint
+ // encodes the length of tableName.
+ // If < 0 we just read the version and the next vint is the length.
+ // @see Bytes#readByteArray(DataInput)
+ int len = WritableUtils.readVInt(in);
+ if (len < 0) {
+ // what we just read was the version
+ version = Version.fromCode(len);
+ len = WritableUtils.readVInt(in);
+ }
+ tableName = new byte[len];
+ in.readFully(tableName);
startRow = Bytes.readByteArray(in);
endRow = Bytes.readByteArray(in);
regionLocation = Bytes.toString(Bytes.readByteArray(in));
+ if (version.atLeast(Version.INITIAL)) {
+ scan = Bytes.toString(Bytes.readByteArray(in));
+ }
}
/**
@@ -147,10 +234,12 @@ implements Writable, Comparable<TableSpl
*/
@Override
public void write(DataOutput out) throws IOException {
+ WritableUtils.writeVInt(out, VERSION.code);
Bytes.writeByteArray(out, tableName);
Bytes.writeByteArray(out, startRow);
Bytes.writeByteArray(out, endRow);
Bytes.writeByteArray(out, Bytes.toBytes(regionLocation));
+ Bytes.writeByteArray(out, Bytes.toBytes(scan));
}
/**
@@ -174,7 +263,12 @@ implements Writable, Comparable<TableSpl
*/
@Override
public int compareTo(TableSplit split) {
- return Bytes.compareTo(getStartRow(), split.getStartRow());
+ // If The table name of the two splits is the same then compare start row
+ // otherwise compare based on table names
+ int tableNameComparison =
+ Bytes.compareTo(getTableName(), split.getTableName());
+ return tableNameComparison != 0 ? tableNameComparison : Bytes.compareTo(
+ getStartRow(), split.getStartRow());
}
@Override
@@ -191,6 +285,7 @@ implements Writable, Comparable<TableSpl
@Override
public int hashCode() {
int result = tableName != null ? Arrays.hashCode(tableName) : 0;
+ result = 31 * result + (scan != null ? scan.hashCode() : 0);
result = 31 * result + (startRow != null ? Arrays.hashCode(startRow) : 0);
result = 31 * result + (endRow != null ? Arrays.hashCode(endRow) : 0);
result = 31 * result + (regionLocation != null ? regionLocation.hashCode() : 0);
Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java?rev=1441708&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java Sat Feb 2 07:33:27 2013
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Tests various scan start and stop row scenarios. This is set in a scan and
+ * tested in a MapReduce job to see if that is handed over and done properly
+ * too.
+ */
+@Category(LargeTests.class)
+public class TestMultiTableInputFormat {
+
+ static final Log LOG = LogFactory.getLog(TestMultiTableInputFormat.class);
+ static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ static final String TABLE_NAME = "scantest";
+ static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
+ static final String KEY_STARTROW = "startRow";
+ static final String KEY_LASTROW = "stpRow";
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ // switch TIF to log at DEBUG level
+ TEST_UTIL.enableDebug(MultiTableInputFormat.class);
+ TEST_UTIL.enableDebug(MultiTableInputFormatBase.class);
+ // start mini hbase cluster
+ TEST_UTIL.startMiniCluster(3);
+ // create and fill table
+ for (int i = 0; i < 3; i++) {
+ HTable table =
+ TEST_UTIL.createTable(Bytes.toBytes(TABLE_NAME + String.valueOf(i)),
+ INPUT_FAMILY);
+ TEST_UTIL.createMultiRegions(table, INPUT_FAMILY);
+ TEST_UTIL.loadTable(table, INPUT_FAMILY);
+ }
+ // start MR cluster
+ TEST_UTIL.startMiniMapReduceCluster();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniMapReduceCluster();
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ Configuration c = TEST_UTIL.getConfiguration();
+ FileUtil.fullyDelete(new File(c.get("hadoop.tmp.dir")));
+ }
+
+ /**
+ * Pass the key and value to reducer.
+ */
+ public static class ScanMapper extends
+ TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
+ /**
+ * Pass the key and value to reduce.
+ *
+ * @param key The key, here "aaa", "aab" etc.
+ * @param value The value is the same as the key.
+ * @param context The task context.
+ * @throws IOException When reading the rows fails.
+ */
+ @Override
+ public void map(ImmutableBytesWritable key, Result value, Context context)
+ throws IOException, InterruptedException {
+ if (value.size() != 1) {
+ throw new IOException("There should only be one input column");
+ }
+ Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> cf =
+ value.getMap();
+ if (!cf.containsKey(INPUT_FAMILY)) {
+ throw new IOException("Wrong input columns. Missing: '" +
+ Bytes.toString(INPUT_FAMILY) + "'.");
+ }
+ String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null));
+ LOG.debug("map: key -> " + Bytes.toStringBinary(key.get()) +
+ ", value -> " + val);
+ context.write(key, key);
+ }
+ }
+
+ /**
+ * Checks the last and first keys seen against the scanner boundaries.
+ */
+ public static class ScanReducer
+ extends
+ Reducer<ImmutableBytesWritable, ImmutableBytesWritable,
+ NullWritable, NullWritable> {
+ private String first = null;
+ private String last = null;
+
+ protected void reduce(ImmutableBytesWritable key,
+ Iterable<ImmutableBytesWritable> values, Context context)
+ throws IOException, InterruptedException {
+ int count = 0;
+ for (ImmutableBytesWritable value : values) {
+ String val = Bytes.toStringBinary(value.get());
+ LOG.debug("reduce: key[" + count + "] -> " +
+ Bytes.toStringBinary(key.get()) + ", value -> " + val);
+ if (first == null) first = val;
+ last = val;
+ count++;
+ }
+ assertEquals(3, count);
+ }
+
+ protected void cleanup(Context context) throws IOException,
+ InterruptedException {
+ Configuration c = context.getConfiguration();
+ String startRow = c.get(KEY_STARTROW);
+ String lastRow = c.get(KEY_LASTROW);
+ LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" +
+ startRow + "\"");
+ LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow +
+ "\"");
+ if (startRow != null && startRow.length() > 0) {
+ assertEquals(startRow, first);
+ }
+ if (lastRow != null && lastRow.length() > 0) {
+ assertEquals(lastRow, last);
+ }
+ }
+ }
+
+ @Test
+ public void testScanEmptyToEmpty() throws IOException, InterruptedException,
+ ClassNotFoundException {
+ testScan(null, null, null);
+ }
+
+ @Test
+ public void testScanEmptyToAPP() throws IOException, InterruptedException,
+ ClassNotFoundException {
+ testScan(null, "app", "apo");
+ }
+
+ @Test
+ public void testScanOBBToOPP() throws IOException, InterruptedException,
+ ClassNotFoundException {
+ testScan("obb", "opp", "opo");
+ }
+
+ @Test
+ public void testScanOPPToEmpty() throws IOException, InterruptedException,
+ ClassNotFoundException {
+ testScan("opp", null, "zzz");
+ }
+
+ @Test
+ public void testScanYZYToEmpty() throws IOException, InterruptedException,
+ ClassNotFoundException {
+ testScan("yzy", null, "zzz");
+ }
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ private void testScan(String start, String stop, String last)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ String jobName =
+ "Scan" + (start != null ? start.toUpperCase() : "Empty") + "To" +
+ (stop != null ? stop.toUpperCase() : "Empty");
+ LOG.info("Before map/reduce startup - job " + jobName);
+ Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+
+ c.set(KEY_STARTROW, start != null ? start : "");
+ c.set(KEY_LASTROW, last != null ? last : "");
+
+ List<Scan> scans = new ArrayList<Scan>();
+
+ for(int i=0; i<3; i++){
+ Scan scan = new Scan();
+
+ scan.addFamily(INPUT_FAMILY);
+ scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(TABLE_NAME + i));
+
+ if (start != null) {
+ scan.setStartRow(Bytes.toBytes(start));
+ }
+ if (stop != null) {
+ scan.setStopRow(Bytes.toBytes(stop));
+ }
+
+ scans.add(scan);
+
+ LOG.info("scan before: " + scan);
+ }
+
+ Job job = new Job(c, jobName);
+
+ TableMapReduceUtil.initTableMapperJob(scans, ScanMapper.class,
+ ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
+ job.setReducerClass(ScanReducer.class);
+ job.setNumReduceTasks(1); // one to get final "first" and "last" key
+ FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
+ LOG.info("Started " + job.getJobName());
+ job.waitForCompletion(true);
+ assertTrue(job.isSuccessful());
+ LOG.info("After map/reduce completion - job " + jobName);
+ }
+}