You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/01/31 20:52:58 UTC
svn commit: r1238774 -
/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
Author: tedyu
Date: Tue Jan 31 19:52:58 2012
New Revision: 1238774
URL: http://svn.apache.org/viewvc?rev=1238774&view=rev
Log:
HBASE-5259 Normalize the RegionLocation in TableInputFormat by the reverse DNS lookup (Liyin Tang)
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java?rev=1238774&r1=1238773&r2=1238774&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java Tue Jan 31 19:52:58 2012
@@ -20,11 +20,17 @@
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
+import java.net.InetAddress;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import javax.naming.NamingException;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
@@ -36,6 +42,7 @@ import org.apache.hadoop.mapreduce.Input
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.net.DNS;
/**
* A base for {@link TableInputFormat}s. Receives a {@link HTable}, an
@@ -78,6 +85,13 @@ extends InputFormat<ImmutableBytesWritab
private TableRecordReader tableRecordReader = null;
+ /** The reverse DNS lookup cache mapping: IPAddress => HostName */
+ private HashMap<InetAddress, String> reverseDNSCacheMap =
+ new HashMap<InetAddress, String>();
+
+ /** The NameServer address */
+ private String nameServer = null;
+
/**
* Builds a TableRecordReader. If no TableRecordReader was provided, uses
* the default.
@@ -128,6 +142,10 @@ extends InputFormat<ImmutableBytesWritab
if (table == null) {
throw new IOException("No table was provided.");
}
+ // Get the name server address and the default value is null.
+ this.nameServer =
+ context.getConfiguration().get("hbase.nameserver.address", null);
+
Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
if (keys == null || keys.getFirst() == null ||
keys.getFirst().length == 0) {
@@ -138,13 +156,24 @@ extends InputFormat<ImmutableBytesWritab
if ( !includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
continue;
}
- String regionLocation = table.getRegionLocation(keys.getFirst()[i]).
- getHostname();
- byte[] startRow = scan.getStartRow();
- byte[] stopRow = scan.getStopRow();
- // determine if the given start an stop key fall into the region
+ HServerAddress regionServerAddress =
+ table.getRegionLocation(keys.getFirst()[i]).getServerAddress();
+ InetAddress regionAddress =
+ regionServerAddress.getInetSocketAddress().getAddress();
+ String regionLocation;
+ try {
+ regionLocation = reverseDNS(regionAddress);
+ } catch (NamingException e) {
+ LOG.error("Cannot resolve the host name for " + regionAddress +
+ " because of " + e);
+ regionLocation = regionServerAddress.getHostname();
+ }
+
+ byte[] startRow = scan.getStartRow();
+ byte[] stopRow = scan.getStopRow();
+ // determine if the given start an stop key fall into the region
if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
- Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
+ Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
(stopRow.length == 0 ||
Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
byte[] splitStart = startRow.length == 0 ||
@@ -164,6 +193,15 @@ extends InputFormat<ImmutableBytesWritab
}
return splits;
}
+
+ private String reverseDNS(InetAddress ipAddress) throws NamingException {
+ String hostName = this.reverseDNSCacheMap.get(ipAddress);
+ if (hostName == null) {
+ hostName = DNS.reverseDns(ipAddress, this.nameServer);
+ this.reverseDNSCacheMap.put(ipAddress, hostName);
+ }
+ return hostName;
+ }
/**
*