You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2013/11/06 20:18:20 UTC
svn commit: r1539432 - in
/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase:
client/HTable.java mapred/TableInputFormat.java
mapred/TableInputFormatBase.java mapreduce/TableInputFormat.java
mapreduce/TableInputFormatBase.java
Author: liyin
Date: Wed Nov 6 19:18:20 2013
New Revision: 1539432
URL: http://svn.apache.org/r1539432
Log:
[HBASE-9896] Adds mappers/job option for HBaseStreaming
Author: rshroff
Summary:
Currently the split algo only supports mappers/region for any MR
job for HBase Streaming. The reason is to have locality of the
region server hosting the region and the mapper task.
However in certain scenarios, we need to have a strict limit on
the number of mappers per job.
This changes adds an additional parameter to control the total
number of mappers/job. The numMappersPerRegion and numMappersPerJob
are mutually exclusive.
Test Plan: tested with various startRow/stopRow/num_mapper configs on SH004
Reviewers: manukranthk, liyintang
Reviewed By: liyintang
CC: hbase-eng@, chaoyc, nzhang
Differential Revision: https://phabricator.fb.com/D1032630
Task ID: 2381046
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1539432&r1=1539431&r2=1539432&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java Wed Nov 6 19:18:20 2013
@@ -394,6 +394,35 @@ public class HTable implements HTableInt
}
/**
+ * Gets the starting and ending row keys for every region in the currently
+ * open table.
+ * <p>
+ * This is mainly useful for the MapReduce integration.
+ * @return TreeMap of {startKey,endKey} pairs
+ * @throws IOException if a remote or network exception occurs
+ */
+ @SuppressWarnings("unchecked")
+ public TreeMap<byte[], byte[]> getStartEndKeysMap() throws IOException {
+ final TreeMap<byte[], byte[]> startEndKeysMap =
+ new TreeMap<byte[], byte[]>(new Bytes.ByteArrayComparator());
+ MetaScannerVisitor visitor = new MetaScannerVisitor() {
+ public boolean processRow(Result rowResult) throws IOException {
+ HRegionInfo info = Writables.getHRegionInfo(
+ rowResult.getValue(HConstants.CATALOG_FAMILY,
+ HConstants.REGIONINFO_QUALIFIER));
+ if (Bytes.equals(info.getTableDesc().getName(), getTableName())) {
+ if (!(info.isOffline() || info.isSplit())) {
+ startEndKeysMap.put(info.getStartKey(), info.getEndKey());
+ }
+ }
+ return true;
+ }
+ };
+ MetaScanner.metaScan(configuration, visitor, this.tableName);
+ return startEndKeysMap;
+ }
+
+ /**
* Returns the Array of StartKeys along with the favoredNodes
* for a particular region. Identifying the the favoredNodes using the
* Meta table similar to the
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java?rev=1539432&r1=1539431&r2=1539432&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java Wed Nov 6 19:18:20 2013
@@ -34,6 +34,7 @@ import org.apache.hadoop.mapred.JobConfi
import org.apache.hadoop.util.StringUtils;
import static org.apache.hadoop.hbase.mapreduce.TableInputFormat.MAPPERS_PER_REGION;
+import static org.apache.hadoop.hbase.mapreduce.TableInputFormat.NUM_MAPPERS_PER_JOB;
import static org.apache.hadoop.hbase.mapreduce.TableInputFormat.SPLIT_ALGO;
/**
@@ -70,6 +71,10 @@ public class TableInputFormat extends Ta
setNumMapperPerRegion(Integer.parseInt(job.get(MAPPERS_PER_REGION)));
}
+ if (job.get(NUM_MAPPERS_PER_JOB) != null) {
+ setNumMappersPerJob(Integer.parseInt(job.get(NUM_MAPPERS_PER_JOB)));
+ }
+
setSplitAlgorithm(job.get(SPLIT_ALGO, UniformSplit.class.getSimpleName()));
}
@@ -92,4 +97,4 @@ public class TableInputFormat extends Ta
throw new IOException("expecting at least one column");
}
}
-}
\ No newline at end of file
+}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java?rev=1539432&r1=1539431&r2=1539432&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java Wed Nov 6 19:18:20 2013
@@ -79,7 +79,12 @@ implements InputFormat<ImmutableBytesWri
private Scan scan;
/** The number of mappers to assign to each region. */
- private int numMappersPerRegion = 1;
+ private int numMappers = 1;
+
+ /** The total number of mappers. The number of mappers per region is
+ * mutually exclusive with number of mappers per job. In case both
+ * are defined, mapperPerJob will take the precedence.*/
+ private boolean mappersPerJob = false;
/** Splitting algorithm to be used to split the keys */
private String splitAlgmName; // default to Uniform
@@ -134,7 +139,7 @@ implements InputFormat<ImmutableBytesWri
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
List<org.apache.hadoop.mapreduce.InputSplit> newStyleSplits =
org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplitsInternal(
- table, job, scan, numMappersPerRegion, splitAlgmName, null);
+ table, job, scan, numMappers, mappersPerJob, splitAlgmName, null);
int n = newStyleSplits.size();
InputSplit[] result = new InputSplit[n];
for (int i = 0; i < n; ++i) {
@@ -210,7 +215,30 @@ implements InputFormat<ImmutableBytesWri
throw new IllegalArgumentException("Expecting at least 1 mapper " +
"per region; instead got: " + num);
}
- numMappersPerRegion = num;
+ if (!mappersPerJob) {
+ numMappers = num;
+ } else {
+ LOG.warn("Ignoring mappersPerRegion config value as mappersPerJob is" +
+ " already set.");
+ }
+ }
+
+ /**
+ * Sets the number of mappers per job.
+ *
+ * @param num
+ * @throws IllegalArgumentException When <code>num</code> <= 0.
+ */
+ public void setNumMappersPerJob(int num) throws IllegalArgumentException {
+ if (num <= 0) {
+ throw new IllegalArgumentException("Expecting at least 1 mapper " +
+ "per region; instead got: " + num);
+ }
+ if (numMappers > 1) {
+ LOG.warn("Overriding num of mappers per region.");
+ }
+ numMappers = num;
+ mappersPerJob = true;
}
public void setSplitAlgorithm(String name) {
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java?rev=1539432&r1=1539431&r2=1539432&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java Wed Nov 6 19:18:20 2013
@@ -93,6 +93,10 @@ implements Configurable {
/** The number of mappers that should be assigned to each region. */
public static final String MAPPERS_PER_REGION = "hbase.mapreduce.mappersperregion";
+ /** The number of mappers that should be assigned per job. */
+ public static final String NUM_MAPPERS_PER_JOB = "hbase.mapreduce.num.mappers";
+
+
/** The Algorithm used to split each region's keyspace. */
public static final String SPLIT_ALGO = "hbase.mapreduce.tableinputformat.split.algo";
@@ -137,6 +141,10 @@ implements Configurable {
setNumMapperPerRegion(Integer.parseInt(conf.get(MAPPERS_PER_REGION)));
}
+ if (conf.get(NUM_MAPPERS_PER_JOB) != null) {
+ setNumMappersPerJob(Integer.parseInt(conf.get(NUM_MAPPERS_PER_JOB)));
+ }
+
setSplitAlgorithm(conf.get(SPLIT_ALGO, UniformSplit.class.getSimpleName()));
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java?rev=1539432&r1=1539431&r2=1539432&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java Wed Nov 6 19:18:20 2013
@@ -21,10 +21,10 @@ package org.apache.hadoop.hbase.mapreduc
import java.io.IOException;
import java.net.InetAddress;
-import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import javax.naming.NamingException;
@@ -67,7 +67,11 @@ extends InputFormat<ImmutableBytesWritab
/** The reader scanning the table, can be a custom one. */
private TableRecordReader tableRecordReader = null;
/** The number of mappers to assign to each region. */
- private int numMappersPerRegion = 1;
+ private int numMappers = 1;
+ /** The total number of mappers. The number of mappers per region is
+ * mutually exclusive with number of mappers per job. In case both
+ * are defined, mappersPerJob will take the precedence.*/
+ private boolean mappersPerJob = false;
/** Splitting algorithm to be used to split the keys */
private String splitAlgmName; // default to Uniform
@@ -121,14 +125,15 @@ extends InputFormat<ImmutableBytesWritab
*/
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException {
- return getSplitsInternal(table, context.getConfiguration(), scan, numMappersPerRegion,
- splitAlgmName, this);
+ return getSplitsInternal(table, context.getConfiguration(), scan, numMappers,
+ mappersPerJob, splitAlgmName, this);
}
public static List<InputSplit> getSplitsInternal(HTable table,
Configuration conf,
Scan scan,
- int numMappersPerRegion,
+ int numMappers,
+ boolean mappersPerJob,
String splitAlgmName,
TableInputFormatBase tifb) throws IOException {
if (table == null) {
@@ -136,6 +141,11 @@ extends InputFormat<ImmutableBytesWritab
}
determineNameServer(conf);
+ if (mappersPerJob) {
+ return getSplitsInternalBasedOnNumMappersPerJob(table, conf, scan, numMappers,
+ splitAlgmName);
+ }
+
Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
if (keys == null || keys.getFirst() == null ||
keys.getFirst().length == 0) {
@@ -144,15 +154,15 @@ extends InputFormat<ImmutableBytesWritab
Pair<byte[][], byte[][]> splitKeys = null;
int numRegions = keys.getFirst().length;
//TODO: Can anything else be done when there are less than 3 regions?
- if ((numMappersPerRegion == 1) || (numRegions < 3)) {
- numMappersPerRegion = 1;
+ if ((numMappers == 1) || (numRegions < 3)) {
+ numMappers = 1;
splitKeys = keys;
} else {
- byte[][] startKeys = new byte[numRegions * numMappersPerRegion][];
- byte[][] stopKeys = new byte[numRegions * numMappersPerRegion][];
+ byte[][] startKeys = new byte[numRegions * numMappers][];
+ byte[][] stopKeys = new byte[numRegions * numMappers][];
// Insert null keys at edges
startKeys[0] = HConstants.EMPTY_START_ROW;
- stopKeys[numRegions * numMappersPerRegion - 1] = HConstants.EMPTY_END_ROW;
+ stopKeys[numRegions * numMappers - 1] = HConstants.EMPTY_END_ROW;
byte[][] originalStartKeys = keys.getFirst();
byte[][] originalStopKeys = keys.getSecond();
@@ -166,14 +176,14 @@ extends InputFormat<ImmutableBytesWritab
algmImpl.setFirstRow(algmImpl.rowToStr(originalStartKeys[i]));
if (originalStopKeys[i].length != 0)
algmImpl.setLastRow(algmImpl.rowToStr(originalStopKeys[i]));
- byte[][] dividingKeys = algmImpl.split(numMappersPerRegion);
+ byte[][] dividingKeys = algmImpl.split(numMappers);
- startKeys[i*numMappersPerRegion] = originalStartKeys[i];
- for (int j = 0; j < numMappersPerRegion - 1; j++) {
- stopKeys[i * numMappersPerRegion + j] = dividingKeys[j];
- startKeys[i * numMappersPerRegion + j + 1] = dividingKeys[j];
+ startKeys[i*numMappers] = originalStartKeys[i];
+ for (int j = 0; j < numMappers - 1; j++) {
+ stopKeys[i * numMappers + j] = dividingKeys[j];
+ startKeys[i * numMappers + j + 1] = dividingKeys[j];
}
- stopKeys[(i+1)*numMappersPerRegion - 1] = originalStopKeys[i];
+ stopKeys[(i+1)*numMappers - 1] = originalStopKeys[i];
}
splitKeys = new Pair<byte[][], byte[][]>();
splitKeys.setFirst(startKeys);
@@ -181,12 +191,12 @@ extends InputFormat<ImmutableBytesWritab
}
List<InputSplit> splits =
- new ArrayList<InputSplit>(numRegions * numMappersPerRegion);
+ new ArrayList<InputSplit>(numRegions * numMappers);
byte[] startRow = scan.getStartRow();
byte[] stopRow = scan.getStopRow();
- for (int i = 0; i < numRegions * numMappersPerRegion; i++) {
- if (tifb != null && !tifb.includeRegionInSplit(keys.getFirst()[i / numMappersPerRegion],
- keys.getSecond()[i / numMappersPerRegion])) {
+ for (int i = 0; i < numRegions * numMappers; i++) {
+ if (tifb != null && !tifb.includeRegionInSplit(keys.getFirst()[i / numMappers],
+ keys.getSecond()[i / numMappers])) {
continue;
}
HServerAddress regionServerAddress =
@@ -226,6 +236,78 @@ extends InputFormat<ImmutableBytesWritab
return splits;
}
+ public static List<InputSplit> getSplitsInternalBasedOnNumMappersPerJob(
+ HTable table, Configuration conf, Scan scan, int numMappersPerJob,
+ String splitAlgmName) throws IOException {
+ List<InputSplit> splits =
+ new ArrayList<InputSplit>(numMappersPerJob);
+ byte[] startRow = scan.getStartRow();
+ byte[] stopRow = scan.getStopRow();
+
+ SplitAlgorithm algmImpl = RegionSplitter.newSplitAlgoInstance(conf,
+ splitAlgmName);
+ if (startRow.length != 0)
+ algmImpl.setFirstRow(algmImpl.rowToStr(startRow));
+ if (stopRow.length != 0)
+ algmImpl.setLastRow(algmImpl.rowToStr(stopRow));
+
+ // This will return numMappersPerJob - 1 split interval
+ byte[][] dividingKeys = algmImpl.split(numMappersPerJob);
+
+ byte[] splitStartKey = algmImpl.firstRow();
+ byte[] splitStopKey = algmImpl.lastRow();
+
+ TreeMap<byte[], byte[]> regions = table.getStartEndKeysMap();
+
+ // No splits possible, just default it to 1
+ if (dividingKeys == null || dividingKeys.length == 0) {
+ byte[] startRowOfFullRegion = getNearestFullRegion(regions, splitStartKey,
+ splitStopKey);
+ InputSplit split = new TableSplit(table.getTableName(),
+ splitStartKey, splitStopKey, getRegionServerAddress(table,
+ startRowOfFullRegion));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getSplits: split -> " + split);
+ }
+ splits.add(split);
+ } else {
+ for (int i = 0; i <= dividingKeys.length; i++) {
+ if (i == dividingKeys.length) {
+ splitStopKey = algmImpl.lastRow();
+ } else {
+ splitStopKey = dividingKeys[i];
+ }
+ byte[] startRowOfFullRegion = getNearestFullRegion(regions, splitStartKey,
+ splitStopKey);
+ InputSplit split = new TableSplit(table.getTableName(),
+ splitStartKey, splitStopKey, getRegionServerAddress(table,
+ startRowOfFullRegion));
+ splits.add(split);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getSplits: split -> " + i + " -> " + split);
+ }
+ splitStartKey = splitStopKey;
+ }
+ }
+ HConnectionManager.deleteAllZookeeperConnections();
+ return splits;
+ }
+
+ private static byte[] getNearestFullRegion(
+ final TreeMap<byte[], byte[]> regions,
+ final byte[] startKey, final byte[] stopKey) {
+
+ Map.Entry<byte[], byte[]> region = regions.ceilingEntry(startKey);
+ if (region != null) {
+ if (Bytes.compareTo(region.getValue(), stopKey) < 0) {
+ // The given range spans at least one complete region. Return
+ // the start row for this region
+ return region.getKey();
+ }
+ }
+ return startKey;
+ }
+
private static synchronized void determineNameServer(Configuration conf) {
// Get the name server address and the default value is null.
if (nameServer == null) {
@@ -233,6 +315,24 @@ extends InputFormat<ImmutableBytesWritab
}
}
+ private static String getRegionServerAddress(final HTable table, final byte[] row)
+ throws IOException {
+ String regionLocation = null;
+ HServerAddress regionServerAddress =
+ table.getRegionLocation(row).getServerAddress();
+ InetAddress regionAddress = null;
+ try {
+ regionAddress =
+ regionServerAddress.getInetSocketAddress().getAddress();
+ regionLocation = reverseDNS(regionAddress);
+ } catch (NamingException e) {
+ LOG.error("Cannot resolve the host name for " + regionAddress +
+ " because of " + e);
+ regionLocation = regionServerAddress.getHostname();
+ }
+ return regionLocation;
+ }
+
private static String reverseDNS(InetAddress ipAddress)
throws NamingException {
String hostName = reverseDNSCacheMap.get(ipAddress);
@@ -327,7 +427,30 @@ extends InputFormat<ImmutableBytesWritab
throw new IllegalArgumentException("Expecting at least 1 mapper " +
"per region; instead got: " + num);
}
- numMappersPerRegion = num;
+ if (!mappersPerJob) {
+ numMappers = num;
+ } else {
+ LOG.warn("Ingoring mappersPerRegion config value as mappersPerJob is" +
+ " already set.");
+ }
+ }
+
+ /**
+ * Sets the number of mappers assigned for the job.
+ *
+ * @param num
+ * @throws IllegalArgumentException When <code>num</code> <= 0.
+ */
+ public void setNumMappersPerJob(int num) throws IllegalArgumentException {
+ if (num <= 0) {
+ throw new IllegalArgumentException("Expecting at least 1 mapper " +
+ "per job; instead got: " + num);
+ }
+ if (numMappers > 1) {
+ LOG.warn("Overriding num of mappers per region.");
+ }
+ numMappers = num;
+ mappersPerJob = true;
}
public void setSplitAlgorithm(String name) {